This is an automated email from the ASF dual-hosted git repository.

shaoxuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3050957  [FLINK-12473][ml] Add ML pipeline and MLlib interface
3050957 is described below

commit 305095743ffe0bc39f76c1bda983da7d0df9e003
Author: Gen Luo <[email protected]>
AuthorDate: Fri May 24 17:05:23 2019 +0800

    [FLINK-12473][ml] Add ML pipeline and MLlib interface
    
    This closes #8402
---
 flink-ml-parent/flink-ml-api/pom.xml               |  45 ++++
 .../org/apache/flink/ml/api/core/Estimator.java    |  47 ++++
 .../java/org/apache/flink/ml/api/core/Model.java   |  39 +++
 .../org/apache/flink/ml/api/core/Pipeline.java     | 266 +++++++++++++++++++++
 .../apache/flink/ml/api/core/PipelineStage.java    |  56 +++++
 .../org/apache/flink/ml/api/core/Transformer.java  |  42 ++++
 .../apache/flink/ml/api/misc/param/ParamInfo.java  | 130 ++++++++++
 .../flink/ml/api/misc/param/ParamInfoFactory.java  | 129 ++++++++++
 .../flink/ml/api/misc/param/ParamValidator.java    |  39 +++
 .../org/apache/flink/ml/api/misc/param/Params.java | 151 ++++++++++++
 .../apache/flink/ml/api/misc/param/WithParams.java |  60 +++++
 .../flink/ml/util/param/ExtractParamInfosUtil.java |  73 ++++++
 .../org/apache/flink/ml/api/core/PipelineTest.java | 181 ++++++++++++++
 .../org/apache/flink/ml/api/misc/ParamsTest.java   |  72 ++++++
 .../ml/util/param/ExtractParamInfosUtilTest.java   | 104 ++++++++
 flink-ml-parent/pom.xml                            |  39 +++
 pom.xml                                            |   1 +
 17 files changed, 1474 insertions(+)

diff --git a/flink-ml-parent/flink-ml-api/pom.xml 
b/flink-ml-parent/flink-ml-api/pom.xml
new file mode 100644
index 0000000..f77f068
--- /dev/null
+++ b/flink-ml-parent/flink-ml-api/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-ml-parent</artifactId>
+               <version>1.9-SNAPSHOT</version>
+       </parent>
+
+       <artifactId>flink-ml-api</artifactId>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-api-java</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-shaded-jackson</artifactId>
+                       
<version>${jackson.version}-${flink.shaded.version}</version>
+               </dependency>
+       </dependencies>
+</project>
diff --git 
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Estimator.java
 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Estimator.java
new file mode 100644
index 0000000..8e31d94
--- /dev/null
+++ 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Estimator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.core;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+
+/**
+ * Estimators are {@link PipelineStage}s responsible for training and 
generating machine learning
+ * models.
+ *
+ * <p>The implementations are expected to take an input table as training 
samples and generate a
+ * {@link Model} which fits these samples.
+ *
+ * @param <E> class type of the Estimator implementation itself, used by {@link
+ *            org.apache.flink.ml.api.misc.param.WithParams}.
+ * @param <M> class type of the {@link Model} this Estimator produces.
+ */
+@PublicEvolving
+public interface Estimator<E extends Estimator<E, M>, M extends Model<M>> 
extends PipelineStage<E> {
+
+       /**
+        * Train and produce a {@link Model} which fits the records in the 
given {@link Table}.
+        *
+        * @param tEnv  the table environment to which the input table is bound.
+        * @param input the table with records to train the Model.
+        * @return a model trained to fit on the given Table.
+        */
+       M fit(TableEnvironment tEnv, Table input);
+}
diff --git 
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Model.java
 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Model.java
new file mode 100644
index 0000000..b52b6a9
--- /dev/null
+++ 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Model.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.core;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.Table;
+
+/**
+ * A model is an ordinary {@link Transformer} except how it is created. While 
ordinary transformers
+ * are defined by specifying the parameters directly, a model is usually 
generated by an {@link
+ * Estimator} when {@link 
Estimator#fit(org.apache.flink.table.api.TableEnvironment, Table)} is
+ * invoked.
+ *
+ * <p>We separate Model from {@link Transformer} in order to support potential
+ * model specific logic such as linking a Model to the {@link Estimator} from 
which the model was
+ * generated.
+ *
+ * @param <M> The class type of the Model implementation itself, used by {@link
+ *            org.apache.flink.ml.api.misc.param.WithParams}
+ */
+@PublicEvolving
+public interface Model<M extends Model<M>> extends Transformer<M> {
+}
diff --git 
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Pipeline.java
 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Pipeline.java
new file mode 100644
index 0000000..c8326c4
--- /dev/null
+++ 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Pipeline.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.core;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.util.InstantiationUtil;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A pipeline is a linear workflow which chains {@link Estimator}s and {@link 
Transformer}s to
+ * execute an algorithm.
+ *
+ * <p>A pipeline itself can either act as an Estimator or a Transformer, 
depending on the stages it
+ * includes. More specifically:
+ * <ul>
+ * <li>
+ * If a Pipeline has an {@link Estimator}, one needs to call {@link 
Pipeline#fit(TableEnvironment,
+ * Table)} before use the pipeline as a {@link Transformer} . In this case the 
Pipeline is an {@link
+ * Estimator} and can produce a Pipeline as a {@link Model}.
+ * </li>
+ * <li>
+ * If a Pipeline has no {@link Estimator}, it is a {@link Transformer} and can 
be applied to a Table
+ * directly. In this case, {@link Pipeline#fit(TableEnvironment, Table)} will 
simply return the
+ * pipeline itself.
+ * </li>
+ * </ul>
+ *
+ * <p>In addition, a pipeline can also be used as a {@link PipelineStage} in 
another pipeline, just
+ * like an ordinary {@link Estimator} or {@link Transformer} as describe above.
+ */
+@PublicEvolving
+public final class Pipeline implements Estimator<Pipeline, Pipeline>, 
Transformer<Pipeline>,
+       Model<Pipeline> {
+       private static final long serialVersionUID = 1L;
+       private final List<PipelineStage> stages = new ArrayList<>();
+       private final Params params = new Params();
+
+       private int lastEstimatorIndex = -1;
+
+       public Pipeline() {
+       }
+
+       public Pipeline(String pipelineJson) {
+               this.loadJson(pipelineJson);
+       }
+
+       public Pipeline(List<PipelineStage> stages) {
+               for (PipelineStage s : stages) {
+                       appendStage(s);
+               }
+       }
+
+       //is the stage a simple Estimator or pipeline with Estimator
+       private static boolean isStageNeedFit(PipelineStage stage) {
+               return (stage instanceof Pipeline && ((Pipeline) 
stage).needFit()) ||
+                       (!(stage instanceof Pipeline) && stage instanceof 
Estimator);
+       }
+
+       /**
+        * Appends a PipelineStage to the tail of this pipeline. Pipeline is 
editable only via this
+        * method. The PipelineStage must be Estimator, Transformer, Model or 
Pipeline.
+        *
+        * @param stage the stage to be appended
+        */
+       public Pipeline appendStage(PipelineStage stage) {
+               if (isStageNeedFit(stage)) {
+                       lastEstimatorIndex = stages.size();
+               } else if (!(stage instanceof Transformer)) {
+                       throw new RuntimeException(
+                               "All PipelineStages should be Estimator or 
Transformer, got:" +
+                                       stage.getClass().getSimpleName());
+               }
+               stages.add(stage);
+               return this;
+       }
+
+       /**
+        * Returns a list of all stages in this pipeline in order, the list is 
immutable.
+        *
+        * @return an immutable list of all stages in this pipeline in order.
+        */
+       public List<PipelineStage> getStages() {
+               return Collections.unmodifiableList(stages);
+       }
+
+       /**
+        * Check whether the pipeline acts as an {@link Estimator} or not. When 
the return value is
+        * true, that means this pipeline contains an {@link Estimator} and 
thus users must invoke
+        * {@link #fit(TableEnvironment, Table)} before they can use this 
pipeline as a {@link
+        * Transformer}. Otherwise, the pipeline can be used as a {@link 
Transformer} directly.
+        *
+        * @return {@code true} if this pipeline has an Estimator, {@code 
false} otherwise
+        */
+       public boolean needFit() {
+               return this.getIndexOfLastEstimator() >= 0;
+       }
+
+       public Params getParams() {
+               return params;
+       }
+
+       //find the last Estimator or Pipeline that needs fit in stages, -1 
stand for no Estimator in Pipeline
+       private int getIndexOfLastEstimator() {
+               return lastEstimatorIndex;
+       }
+
+       /**
+        * Train the pipeline to fit on the records in the given {@link Table}.
+        *
+        * <p>This method go through all the {@link PipelineStage}s in order 
and does the following
+        * on each stage until the last {@link Estimator}(inclusive).
+        *
+        * <ul>
+        * <li>
+        * If a stage is an {@link Estimator}, invoke {@link 
Estimator#fit(TableEnvironment, Table)}
+        * with the input table to generate a {@link Model}, transform the the 
input table with the
+        * generated {@link Model} to get a result table, then pass the result 
table to the next stage
+        * as input.
+        * </li>
+        * <li>
+        * If a stage is a {@link Transformer}, invoke {@link 
Transformer#transform(TableEnvironment,
+        * Table)} on the input table to get a result table, and pass the 
result table to the next stage
+        * as input.
+        * </li>
+        * </ul>
+        *
+        * <p>After all the {@link Estimator}s are trained to fit their input 
tables, a new
+        * pipeline will be created with the same stages in this pipeline, 
except that all the
+        * Estimators in the new pipeline are replaced with their corresponding 
Models generated in the
+        * above process.
+        *
+        * <p>If there is no {@link Estimator} in the pipeline, the method 
returns a copy of this
+        * pipeline.
+        *
+        * @param tEnv  the table environment to which the input table is bound.
+        * @param input the table with records to train the Pipeline.
+        * @return a pipeline with same stages as this Pipeline except all 
Estimators replaced with
+        * their corresponding Models.
+        */
+       @Override
+       public Pipeline fit(TableEnvironment tEnv, Table input) {
+               List<PipelineStage> transformStages = new 
ArrayList<>(stages.size());
+               int lastEstimatorIdx = getIndexOfLastEstimator();
+               for (int i = 0; i < stages.size(); i++) {
+                       PipelineStage s = stages.get(i);
+                       if (i <= lastEstimatorIdx) {
+                               Transformer t;
+                               boolean needFit = isStageNeedFit(s);
+                               if (needFit) {
+                                       t = ((Estimator) s).fit(tEnv, input);
+                               } else {
+                                       // stage is Transformer, guaranteed in 
appendStage() method
+                                       t = (Transformer) s;
+                               }
+                               transformStages.add(t);
+                               input = t.transform(tEnv, input);
+                       } else {
+                               transformStages.add(s);
+                       }
+               }
+               return new Pipeline(transformStages);
+       }
+
+       /**
+        * Generate a result table by applying all the stages in this pipeline 
to the input table in
+        * order.
+        *
+        * @param tEnv  the table environment to which the input table is bound.
+        * @param input the table to be transformed
+        * @return a result table with all the stages applied to the input 
tables in order.
+        */
+       @Override
+       public Table transform(TableEnvironment tEnv, Table input) {
+               if (needFit()) {
+                       throw new RuntimeException("Pipeline contains 
Estimator, need to fit first.");
+               }
+               for (PipelineStage s : stages) {
+                       input = ((Transformer) s).transform(tEnv, input);
+               }
+               return input;
+       }
+
+       @Override
+       public String toJson() {
+               ObjectMapper mapper = new ObjectMapper();
+
+               List<Map<String, String>> stageJsons = new ArrayList<>();
+               for (PipelineStage s : getStages()) {
+                       Map<String, String> stageMap = new HashMap<>();
+                       stageMap.put("stageClassName", 
s.getClass().getTypeName());
+                       stageMap.put("stageJson", s.toJson());
+                       stageJsons.add(stageMap);
+               }
+
+               try {
+                       return mapper.writeValueAsString(stageJsons);
+               } catch (JsonProcessingException e) {
+                       throw new RuntimeException("Failed to serialize 
pipeline", e);
+               }
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void loadJson(String json) {
+               ObjectMapper mapper = new ObjectMapper();
+               List<Map<String, String>> stageJsons;
+               try {
+                       stageJsons = mapper.readValue(json, List.class);
+               } catch (IOException e) {
+                       throw new RuntimeException("Failed to deserialize 
pipeline json:" + json, e);
+               }
+               for (Map<String, String> stageMap : stageJsons) {
+                       appendStage(restoreInnerStage(stageMap));
+               }
+       }
+
+       private PipelineStage<?> restoreInnerStage(Map<String, String> 
stageMap) {
+               String className = stageMap.get("stageClassName");
+               Class<?> clz;
+               try {
+                       clz = Class.forName(className);
+               } catch (ClassNotFoundException e) {
+                       throw new RuntimeException("PipelineStage class " + 
className + " not exists", e);
+               }
+               InstantiationUtil.checkForInstantiation(clz);
+
+               PipelineStage<?> s;
+               try {
+                       s = (PipelineStage<?>) clz.newInstance();
+               } catch (Exception e) {
+                       throw new RuntimeException("Class is instantiable but 
failed to new an instance", e);
+               }
+
+               String stageJson = stageMap.get("stageJson");
+               s.loadJson(stageJson);
+               return s;
+       }
+}
diff --git 
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/PipelineStage.java
 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/PipelineStage.java
new file mode 100644
index 0000000..86bf0d3
--- /dev/null
+++ 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/PipelineStage.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.core;
+
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.util.param.ExtractParamInfosUtil;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base class for a stage in a pipeline. The interface is only a concept, and 
does not have any
+ * actual functionality. Its subclasses must be either Estimator or 
Transformer. No other classes
+ * should inherit this interface directly.
+ *
+ * <p>Each pipeline stage is with parameters, and requires a public empty 
constructor for
+ * restoration in Pipeline.
+ *
+ * @param <T> The class type of the PipelineStage implementation itself, used 
by {@link
+ *            org.apache.flink.ml.api.misc.param.WithParams}
+ * @see WithParams
+ */
+interface PipelineStage<T extends PipelineStage<T>> extends WithParams<T>, 
Serializable {
+
+       default String toJson() {
+               return getParams().toJson();
+       }
+
+       default void loadJson(String json) {
+               List<ParamInfo> paramInfos = 
ExtractParamInfosUtil.extractParamInfos(this);
+               Map<String, Class<?>> classMap = new HashMap<>();
+               for (ParamInfo i : paramInfos) {
+                       classMap.put(i.getName(), i.getValueClass());
+               }
+               getParams().loadJson(json, classMap);
+       }
+}
diff --git 
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Transformer.java
 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Transformer.java
new file mode 100644
index 0000000..9435a61
--- /dev/null
+++ 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Transformer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.core;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+
+/**
+ * A transformer is a {@link PipelineStage} that transforms an input {@link 
Table} to a result
+ * {@link Table}.
+ *
+ * @param <T> The class type of the Transformer implementation itself, used by 
{@link
+ *            org.apache.flink.ml.api.misc.param.WithParams}
+ */
+@PublicEvolving
+public interface Transformer<T extends Transformer<T>> extends 
PipelineStage<T> {
+       /**
+        * Applies the transformer on the input table, and returns the result 
table.
+        *
+        * @param tEnv  the table environment to which the input table is bound.
+        * @param input the table to be transformed
+        * @return the transformed table
+        */
+       Table transform(TableEnvironment tEnv, Table input);
+}
diff --git 
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/ParamInfo.java
 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/ParamInfo.java
new file mode 100644
index 0000000..994576f
--- /dev/null
+++ 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/ParamInfo.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.misc.param;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Definition of a parameter, including name, type, default value, validator 
and so on.
+ *
+ * <p>This class is provided to unify the interaction with parameters.
+ *
+ * @param <V> the type of the param value
+ */
+@PublicEvolving
+public class ParamInfo<V> {
+       private final String name;
+       private final String[] alias;
+       private final String description;
+       private final boolean isOptional;
+       private final boolean hasDefaultValue;
+       private final V defaultValue;
+       private final ParamValidator<V> validator;
+       private final Class<V> valueClass;
+
+       ParamInfo(String name, String[] alias, String description, boolean 
isOptional,
+                       boolean hasDefaultValue, V defaultValue,
+                       ParamValidator<V> validator, Class<V> valueClass) {
+               this.name = name;
+               this.alias = alias;
+               this.description = description;
+               this.isOptional = isOptional;
+               this.hasDefaultValue = hasDefaultValue;
+               this.defaultValue = defaultValue;
+               this.validator = validator;
+               this.valueClass = valueClass;
+       }
+
+       /**
+        * Returns the name of the parameter. The name must be unique in the 
stage the ParamInfo
+        * belongs to.
+        *
+        * @return the name of the parameter
+        */
+       public String getName() {
+               return name;
+       }
+
+       /**
+        * Returns the aliases of the parameter. The alias will be an empty 
string array by default.
+        *
+        * @return the aliases of the parameter
+        */
+       public String[] getAlias() {
+               return alias;
+       }
+
+       /**
+        * Returns the description of the parameter.
+        *
+        * @return the description of the parameter
+        */
+       public String getDescription() {
+               return description;
+       }
+
+       /**
+        * Returns whether the parameter is optional.
+        *
+        * @return {@code true} if the param is optional, {@code false} 
otherwise
+        */
+       public boolean isOptional() {
+               return isOptional;
+       }
+
+       /**
+        * Returns whether the parameter has a default value. Since {@code 
null} may also be a valid
+        * default value of a parameter, the return of getDefaultValue may be 
{@code null} even when
+        * this method returns true.
+        *
+        * @return {@code true} if the param is has a default value(even if 
it's a {@code null}), {@code
+        * false} otherwise
+        */
+       public boolean hasDefaultValue() {
+               return hasDefaultValue;
+       }
+
+       /**
+        * Returns the default value of the parameter. The default value should 
be defined whenever
+        * possible. The default value can be a {@code null} even if 
hasDefaultValue returns true.
+        *
+        * @return the default value of the param, {@code null} if not defined
+        */
+       public V getDefaultValue() {
+               return defaultValue;
+       }
+
+       /**
+        * Returns the validator to validate the value of the parameter.
+        *
+        * @return the validator to validate the value of the parameter.
+        */
+       public ParamValidator<V> getValidator() {
+               return validator;
+       }
+
+       /**
+        * Returns the class of the param value. It's usually needed in 
serialization.
+        *
+        * @return the class of the param value
+        */
+       public Class<V> getValueClass() {
+               return valueClass;
+       }
+}
diff --git 
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/ParamInfoFactory.java
 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/ParamInfoFactory.java
new file mode 100644
index 0000000..4c49580
--- /dev/null
+++ 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/ParamInfoFactory.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.misc.param;
+
+/**
+ * Factory to create ParamInfo, all ParamInfos should be created via this 
class.
+ */
+public class ParamInfoFactory {
+       /**
+        * Returns a ParamInfoBuilder to configure and build a new ParamInfo.
+        *
+        * @param name       name of the new ParamInfo
+        * @param valueClass value class of the new ParamInfo
+        * @param <V>        value type of the new ParamInfo
+        * @return a ParamInfoBuilder
+        */
+       public static <V> ParamInfoBuilder<V> createParamInfo(String name, 
Class<V> valueClass) {
+               return new ParamInfoBuilder<>(name, valueClass);
+       }
+
+       /**
+        * Builder to build a new ParamInfo. Builder is created by 
ParamInfoFactory with name and
+        * valueClass set.
+        *
+        * @param <V> value type of the new ParamInfo
+        */
+       public static class ParamInfoBuilder<V> {
+               private String name;
+               private String[] alias = new String[0];
+               private String description;
+               private boolean isOptional = true;
+               private boolean hasDefaultValue = false;
+               private V defaultValue;
+               private ParamValidator<V> validator;
+               private Class<V> valueClass;
+
+               ParamInfoBuilder(String name, Class<V> valueClass) {
+                       this.name = name;
+                       this.valueClass = valueClass;
+               }
+
+               /**
+                * Sets the aliases of the parameter.
+                *
+                * @return the builder itself
+                */
+               public ParamInfoBuilder<V> setAlias(String[] alias) {
+                       this.alias = alias;
+                       return this;
+               }
+
+               /**
+                * Sets the description of the parameter.
+                *
+                * @return the builder itself
+                */
+               public ParamInfoBuilder<V> setDescription(String description) {
+                       this.description = description;
+                       return this;
+               }
+
+               /**
+                * Sets the flag indicating the parameter is optional. The 
parameter is optional by default.
+                *
+                * @return the builder itself
+                */
+               public ParamInfoBuilder<V> setOptional() {
+                       this.isOptional = true;
+                       return this;
+               }
+
+               /**
+                * Sets the flag indicating the parameter is required.
+                *
+                * @return the builder itself
+                */
+               public ParamInfoBuilder<V> setRequired() {
+                       this.isOptional = false;
+                       return this;
+               }
+
+               /**
+                * Sets the flag indicating the parameter has default value, 
and sets the default value.
+                *
+                * @return the builder itself
+                */
+               public ParamInfoBuilder<V> setHasDefaultValue(V defaultValue) {
+                       this.hasDefaultValue = true;
+                       this.defaultValue = defaultValue;
+                       return this;
+               }
+
+               /**
+                * Sets the validator to validate the parameter value set by 
users.
+                *
+                * @return the builder itself
+                */
+               public ParamInfoBuilder<V> setValidator(ParamValidator<V> 
validator) {
+                       this.validator = validator;
+                       return this;
+               }
+
+               /**
+                * Builds the defined ParamInfo and returns it. The ParamInfo 
will be immutable.
+                *
+                * @return the defined ParamInfo
+                */
+               public ParamInfo<V> build() {
+                       return new ParamInfo<>(name, alias, description, 
isOptional, hasDefaultValue,
+                               defaultValue, validator, valueClass);
+               }
+       }
+}
diff --git 
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/ParamValidator.java
 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/ParamValidator.java
new file mode 100644
index 0000000..c95b146
--- /dev/null
+++ 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/ParamValidator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.misc.param;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/**
+ * An interface used by {@link ParamInfo} to do validation when a parameter 
value is set.
+ *
+ * @param <V> the type of the value to validate
+ */
+@PublicEvolving
+public interface ParamValidator<V> extends Serializable {
+       /**
+        * Validates a parameter value.
+        *
+        * @param value value to validate
+        * @return {@code true} if the value is valid, {@code false} otherwise
+        */
+       boolean validate(V value);
+}
diff --git 
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
new file mode 100644
index 0000000..0c1e0d8
--- /dev/null
+++ 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.misc.param;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The map-like container class for parameter. This class is provided to unify 
the interaction with
+ * parameters.
+ */
+@PublicEvolving
+public class Params implements Serializable {
+       private final Map<String, Object> paramMap = new HashMap<>();
+
+       /**
+        * Returns the value of the specific parameter, or default value 
defined in the {@code info} if
+        * this Params doesn't contain the param.
+        *
+        * @param info the info of the specific parameter, usually with default 
value
+        * @param <V>  the type of the specific parameter
+        * @return the value of the specific parameter, or default value 
defined in the {@code info} if
+        * this Params doesn't contain the parameter
+        * @throws RuntimeException if the Params doesn't contains the specific 
parameter, while the
+        *                          param is not optional but has no default 
value in the {@code info}
+        */
+       @SuppressWarnings("unchecked")
+       public <V> V get(ParamInfo<V> info) {
+               V value = (V) paramMap.getOrDefault(info.getName(), 
info.getDefaultValue());
+               if (value == null && !info.isOptional() && 
!info.hasDefaultValue()) {
+                       throw new RuntimeException(info.getName() +
+                               " not exist which is not optional and don't 
have a default value");
+               }
+               return value;
+       }
+
+       /**
+        * Set the value of the specific parameter.
+        *
+        * @param info  the info of the specific parameter to set.
+        * @param value the value to be set to the specific parameter.
+        * @param <V>   the type of the specific parameter.
+        * @return the previous value of the specific parameter, or null if 
this Params didn't contain
+        * the parameter before
+        * @throws RuntimeException if the {@code info} has a validator and the 
{@code value} is
+        *                          evaluated as illegal by the validator
+        */
+       public <V> Params set(ParamInfo<V> info, V value) {
+               if (!info.isOptional() && value == null) {
+                       throw new RuntimeException(
+                               "Setting " + info.getName() + " as null while 
it's not a optional param");
+               }
+               if (value == null) {
+                       remove(info);
+                       return this;
+               }
+
+               if (info.getValidator() != null && 
!info.getValidator().validate(value)) {
+                       throw new RuntimeException(
+                               "Setting " + info.getName() + " as a invalid 
value:" + value);
+               }
+               paramMap.put(info.getName(), value);
+               return this;
+       }
+
+       /**
+        * Removes the specific parameter from this Params.
+        *
+        * @param info the info of the specific parameter to remove
+        * @param <V>  the type of the specific parameter
+        */
+       public <V> void remove(ParamInfo<V> info) {
+               paramMap.remove(info.getName());
+       }
+
+       /**
+        * Creates and returns a deep clone of this Params.
+        *
+        * @return a deep clone of this Params
+        */
+       public Params clone() {
+               Params newParams = new Params();
+               newParams.paramMap.putAll(this.paramMap);
+               return newParams;
+       }
+
+       /**
+        * Returns a json containing all parameters in this Params. The json 
should be human-readable if
+        * possible.
+        *
+        * @return a json containing all parameters in this Params
+        */
+       public String toJson() {
+               ObjectMapper mapper = new ObjectMapper();
+               Map<String, String> stringMap = new HashMap<>();
+               try {
+                       for (Map.Entry<String, Object> e : paramMap.entrySet()) 
{
+                               stringMap.put(e.getKey(), 
mapper.writeValueAsString(e.getValue()));
+                       }
+                       return mapper.writeValueAsString(stringMap);
+               } catch (JsonProcessingException e) {
+                       throw new RuntimeException("Failed to serialize params 
to json", e);
+               }
+       }
+
+       /**
+        * Restores the parameters from the given json. The parameters should 
be exactly the same with
+        * the one who was serialized to the input json after the restoration. 
The class mapping of the
+        * parameters in the json is required because it is hard to directly 
restore a param of a user
+        * defined type. Params will be treated as String if it doesn't exist 
in the {@code classMap}.
+        *
+        * @param json     the json String to restore from
+        * @param classMap the classes of the parameters contained in the json
+        */
+       @SuppressWarnings("unchecked")
+       public void loadJson(String json, Map<String, Class<?>> classMap) {
+               ObjectMapper mapper = new ObjectMapper();
+               try {
+                       Map<String, String> m = mapper.readValue(json, 
Map.class);
+                       for (Map.Entry<String, String> e : m.entrySet()) {
+                               Class<?> valueClass = 
classMap.getOrDefault(e.getKey(), String.class);
+                               paramMap.put(e.getKey(), 
mapper.readValue(e.getValue(), valueClass));
+                       }
+               } catch (IOException e) {
+                       throw new RuntimeException("Failed to deserialize 
json:" + json, e);
+               }
+       }
+}
diff --git 
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/WithParams.java
 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/WithParams.java
new file mode 100644
index 0000000..5e87508
--- /dev/null
+++ 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/WithParams.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.misc.param;
+
+/**
+ * Parameters are widely used in machine learning realm. This class defines a 
common interface to
+ * interact with classes with parameters.
+ *
+ * @param <T> the actual type of this WithParams, as the return type of setter
+ */
+public interface WithParams<T> {
+       /**
+        * Returns the all the parameters.
+        *
+        * @return all the parameters.
+        */
+       Params getParams();
+
+       /**
+        * Set the value of a specific parameter.
+        *
+        * @param info  the info of the specific param to set
+        * @param value the value to be set to the specific param
+        * @param <V>   the type of the specific param
+        * @return the WithParams itself
+        */
+       @SuppressWarnings("unchecked")
+       default <V> T set(ParamInfo<V> info, V value) {
+               getParams().set(info, value);
+               return (T) this;
+       }
+
+       /**
+        * Returns the value of the specific param.
+        *
+        * @param info the info of the specific param, usually with default 
value
+        * @param <V>  the type of the specific param
+        * @return the value of the specific param, or default value defined in 
the {@code info} if the
+        * inner Params doesn't contains this param
+        */
+       default <V> V get(ParamInfo<V> info) {
+               return getParams().get(info);
+       }
+}
diff --git 
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/util/param/ExtractParamInfosUtil.java
 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/util/param/ExtractParamInfosUtil.java
new file mode 100644
index 0000000..c59d2e0
--- /dev/null
+++ 
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/util/param/ExtractParamInfosUtil.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.util.param;
+
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.WithParams;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility to extract all ParamInfos defined in a WithParams, mainly used in 
persistence.
+ */
+public final class ExtractParamInfosUtil {
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExtractParamInfosUtil.class);
+
+       /**
+        * Extracts all ParamInfos defined in the given WithParams, including 
those in its superclasses
+        * and interfaces.
+        *
+        * @param s the WithParams to extract ParamInfos from
+        * @return the list of all ParamInfos defined in s
+        */
+       public static List<ParamInfo> extractParamInfos(WithParams s) {
+               return extractParamInfos(s, s.getClass());
+       }
+
+       private static List<ParamInfo> extractParamInfos(WithParams s, Class 
clz) {
+               List<ParamInfo> result = new ArrayList<>();
+               if (clz == null) {
+                       return result;
+               }
+
+               Field[] fields = clz.getDeclaredFields();
+               for (Field f : fields) {
+                       f.setAccessible(true);
+                       if (ParamInfo.class.isAssignableFrom(f.getType())) {
+                               try {
+                                       result.add((ParamInfo) f.get(s));
+                               } catch (IllegalAccessException e) {
+                                       LOG.warn("Failed to extract param info 
{}, ignore it", f.getName(), e);
+                               }
+                       }
+               }
+
+               result.addAll(extractParamInfos(s, clz.getSuperclass()));
+               for (Class c : clz.getInterfaces()) {
+                       result.addAll(extractParamInfos(s, c));
+               }
+
+               return result;
+       }
+}
diff --git 
a/flink-ml-parent/flink-ml-api/src/test/java/org/apache/flink/ml/api/core/PipelineTest.java
 
b/flink-ml-parent/flink-ml-api/src/test/java/org/apache/flink/ml/api/core/PipelineTest.java
new file mode 100644
index 0000000..fc82634
--- /dev/null
+++ 
b/flink-ml-parent/flink-ml-api/src/test/java/org/apache/flink/ml/api/core/PipelineTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.core;
+
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.ParamInfoFactory;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Tests the behavior of {@link Pipeline}.
+ */
+public class PipelineTest {
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       @Test
+       public void testPipelineBehavior() {
+               Pipeline pipeline = new Pipeline();
+               pipeline.appendStage(new MockTransformer("a"));
+               pipeline.appendStage(new MockEstimator("b"));
+               pipeline.appendStage(new MockEstimator("c"));
+               pipeline.appendStage(new MockTransformer("d"));
+               assert describePipeline(pipeline).equals("a_b_c_d");
+
+               Pipeline pipelineModel = pipeline.fit(null, null);
+               assert describePipeline(pipelineModel).equals("a_mb_mc_d");
+
+               thrown.expect(RuntimeException.class);
+               thrown.expectMessage("Pipeline contains Estimator, need to fit 
first.");
+               pipeline.transform(null, null);
+       }
+
+       @Test
+       public void testPipelineRestore() {
+               Pipeline pipeline = new Pipeline();
+               pipeline.appendStage(new MockTransformer("a"));
+               pipeline.appendStage(new MockEstimator("b"));
+               pipeline.appendStage(new MockEstimator("c"));
+               pipeline.appendStage(new MockTransformer("d"));
+               String pipelineJson = pipeline.toJson();
+
+               Pipeline restoredPipeline = new Pipeline(pipelineJson);
+               assert describePipeline(restoredPipeline).equals("a_b_c_d");
+
+               Pipeline pipelineModel = pipeline.fit(null, null);
+               String modelJson = pipelineModel.toJson();
+
+               Pipeline restoredPipelineModel = new Pipeline(modelJson);
+               assert 
describePipeline(restoredPipelineModel).equals("a_mb_mc_d");
+       }
+
+       private static String describePipeline(Pipeline p) {
+               StringBuilder res = new StringBuilder();
+               for (PipelineStage s : p.getStages()) {
+                       if (res.length() != 0) {
+                               res.append("_");
+                       }
+                       res.append(((SelfDescribe) s).describe());
+               }
+               return res.toString();
+       }
+
+       /**
+        * Interface to describe a class with a string, only for pipeline test.
+        */
+       private interface SelfDescribe {
+               ParamInfo<String> DESCRIPTION = 
ParamInfoFactory.createParamInfo("description",
+                       String.class).build();
+
+               String describe();
+       }
+
+       /**
+        * Mock estimator for pipeline test.
+        */
+       public static class MockEstimator implements Estimator<MockEstimator, 
MockModel>, SelfDescribe {
+               private final Params params = new Params();
+
+               public MockEstimator() {
+               }
+
+               MockEstimator(String description) {
+                       set(DESCRIPTION, description);
+               }
+
+               @Override
+               public MockModel fit(TableEnvironment tEnv, Table input) {
+                       return new MockModel("m" + describe());
+               }
+
+               @Override
+               public Params getParams() {
+                       return params;
+               }
+
+               @Override
+               public String describe() {
+                       return get(DESCRIPTION);
+               }
+       }
+
+       /**
+        * Mock transformer for pipeline test.
+        */
+       public static class MockTransformer implements 
Transformer<MockTransformer>, SelfDescribe {
+               private final Params params = new Params();
+
+               public MockTransformer() {
+               }
+
+               MockTransformer(String description) {
+                       set(DESCRIPTION, description);
+               }
+
+               @Override
+               public Table transform(TableEnvironment tEnv, Table input) {
+                       return input;
+               }
+
+               @Override
+               public Params getParams() {
+                       return params;
+               }
+
+               @Override
+               public String describe() {
+                       return get(DESCRIPTION);
+               }
+       }
+
+       /**
+        * Mock model for pipeline test.
+        */
+       public static class MockModel implements Model<MockModel>, SelfDescribe 
{
+               private final Params params = new Params();
+
+               public MockModel() {
+               }
+
+               MockModel(String description) {
+                       set(DESCRIPTION, description);
+               }
+
+               @Override
+               public Table transform(TableEnvironment tEnv, Table input) {
+                       return input;
+               }
+
+               @Override
+               public Params getParams() {
+                       return params;
+               }
+
+               @Override
+               public String describe() {
+                       return get(DESCRIPTION);
+               }
+       }
+}
diff --git 
a/flink-ml-parent/flink-ml-api/src/test/java/org/apache/flink/ml/api/misc/ParamsTest.java
 
b/flink-ml-parent/flink-ml-api/src/test/java/org/apache/flink/ml/api/misc/ParamsTest.java
new file mode 100644
index 0000000..8bdf95b
--- /dev/null
+++ 
b/flink-ml-parent/flink-ml-api/src/test/java/org/apache/flink/ml/api/misc/ParamsTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.misc;
+
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.ParamInfoFactory;
+import org.apache.flink.ml.api.misc.param.Params;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Test for the behavior and validator of {@link Params}.
+ */
+public class ParamsTest {
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       @Test
+       public void testDefaultBehavior() {
+               Params params = new Params();
+
+               ParamInfo<String> optionalWithoutDefault =
+                       ParamInfoFactory.createParamInfo("a", 
String.class).build();
+               assert params.get(optionalWithoutDefault) == null;
+
+               ParamInfo<String> optionalWithDefault =
+                       ParamInfoFactory.createParamInfo("a", 
String.class).setHasDefaultValue("def").build();
+               assert params.get(optionalWithDefault).equals("def");
+
+               ParamInfo<String> requiredWithDefault =
+                       ParamInfoFactory.createParamInfo("a", 
String.class).setRequired()
+                               .setHasDefaultValue("def").build();
+               assert params.get(requiredWithDefault).equals("def");
+
+               ParamInfo<String> requiredWithoutDefault =
+                       ParamInfoFactory.createParamInfo("a", 
String.class).setRequired().build();
+               thrown.expect(RuntimeException.class);
+               thrown.expectMessage("a not exist which is not optional and 
don't have a default value");
+               params.get(requiredWithoutDefault);
+       }
+
+       @Test
+       public void testValidator() {
+               Params params = new Params();
+
+               ParamInfo<Integer> intParam =
+                       ParamInfoFactory.createParamInfo("a", 
Integer.class).setValidator(i -> i > 0).build();
+               params.set(intParam, 1);
+
+               thrown.expect(RuntimeException.class);
+               thrown.expectMessage("Setting a as a invalid value:0");
+               params.set(intParam, 0);
+       }
+}
diff --git 
a/flink-ml-parent/flink-ml-api/src/test/java/org/apache/flink/ml/util/param/ExtractParamInfosUtilTest.java
 
b/flink-ml-parent/flink-ml-api/src/test/java/org/apache/flink/ml/util/param/ExtractParamInfosUtilTest.java
new file mode 100644
index 0000000..5f467e9
--- /dev/null
+++ 
b/flink-ml-parent/flink-ml-api/src/test/java/org/apache/flink/ml/util/param/ExtractParamInfosUtilTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.util.param;
+
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.ParamInfoFactory;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.api.misc.param.WithParams;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test for {@link ExtractParamInfosUtil}.
+ */
+public class ExtractParamInfosUtilTest {
+
+       @Test
+       public void testExtractParamInfos() {
+               List<ParamInfo> noParamInfos =
+                       ExtractParamInfosUtil.extractParamInfos(new 
WithNoParamInfo());
+               assert noParamInfos.isEmpty();
+
+               List<ParamInfo> classParamInfos =
+                       ExtractParamInfosUtil.extractParamInfos(new 
WithTestParamInfo());
+               assert classParamInfos.size() == 1 && 
classParamInfos.get(0).getName().equals("KSC");
+
+               List<ParamInfo> allParamInfos =
+                       ExtractParamInfosUtil.extractParamInfos(new 
TestParamInfoWithInheritedParamInfos());
+               String[] sortedCorrectParamNames = new String[]{"KCP", "KI", 
"KSC"};
+               assert allParamInfos.size() == 3 && 
Arrays.equals(sortedCorrectParamNames,
+                       
allParamInfos.stream().map(ParamInfo::getName).sorted().toArray(String[]::new));
+       }
+
+       /**
+        * Mock WithParams implementation with no ParamInfo. Only for test.
+        */
+       public static class WithNoParamInfo implements 
WithParams<WithNoParamInfo> {
+
+               @Override
+               public Params getParams() {
+                       return null;
+               }
+       }
+
+       /**
+        * Mock WithParams implementation with one ParamInfo. Only for test.
+        * @param <T> subclass of WithTestParamInfo
+        */
+       public static class WithTestParamInfo<T extends WithTestParamInfo> 
implements WithParams<T> {
+               public static final ParamInfo<String> KSC = ParamInfoFactory
+                       .createParamInfo("KSC", String.class)
+                       .setDescription("key from super class").build();
+
+               @Override
+               public Params getParams() {
+                       return null;
+               }
+       }
+
+       /**
+        * Mock interface extending WithParams with one ParamInfo. Only for 
test.
+        * @param <T> implementation class of InterfaceWithParamInfo
+        */
+       public interface InterfaceWithParamInfo<T extends 
InterfaceWithParamInfo>
+               extends WithParams<T> {
+               ParamInfo<String> KI = ParamInfoFactory.createParamInfo("KI", 
String.class)
+                       .setDescription("key from interface").build();
+       }
+
+       /**
+        * Mock WithParams inheriting ParamInfos from superclass and interface. 
Only for test.
+        */
+       public static class TestParamInfoWithInheritedParamInfos
+               extends WithTestParamInfo<TestParamInfoWithInheritedParamInfos>
+               implements 
InterfaceWithParamInfo<TestParamInfoWithInheritedParamInfos> {
+               private static final ParamInfo<String> KCP = ParamInfoFactory
+                       .createParamInfo("KCP", String.class)
+                       .setDescription("key in the class which is 
private").build();
+
+               @Override
+               public Params getParams() {
+                       return null;
+               }
+       }
+}
diff --git a/flink-ml-parent/pom.xml b/flink-ml-parent/pom.xml
new file mode 100644
index 0000000..6ca2ebc
--- /dev/null
+++ b/flink-ml-parent/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-parent</artifactId>
+               <version>1.9-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-ml-parent</artifactId>
+
+       <packaging>pom</packaging>
+
+       <modules>
+               <module>flink-ml-api</module>
+       </modules>
+</project>
diff --git a/pom.xml b/pom.xml
index 086e452..38b3601 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,6 +86,7 @@ under the License.
                <module>flink-fs-tests</module>
                <module>flink-docs</module>
                <module>flink-python</module>
+               <module>flink-ml-parent</module>
        </modules>
 
        <properties>

Reply via email to