jiangxin369 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1133589007
##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java:
##########
@@ -95,4 +109,59 @@ public void testPipeline() throws Exception {
// Executes the loaded Pipeline and verifies that it produces the
expected output.
TestUtils.executeAndCheckOutput(env, loadedEstimator, inputs, output,
null, null);
}
+
+ @Test
+ public void testSupportServable() {
+ SumEstimator estimatorA = new SumEstimator();
+ UnionAlgoOperator algoOperatorA = new UnionAlgoOperator();
+ SumModel modelA = new SumModel();
+ SumModel modelB = new SumModel();
+
+ List<Stage<?>> stages = Arrays.asList(modelA, modelB);
+ PipelineModel pipelineModel = new PipelineModel(stages);
+ assertTrue(pipelineModel.supportServable());
+
+ stages = Arrays.asList(estimatorA, modelA);
+ pipelineModel = new PipelineModel(stages);
+ assertFalse(pipelineModel.supportServable());
+
+ stages = Arrays.asList(algoOperatorA, modelA);
+ pipelineModel = new PipelineModel(stages);
+ assertFalse(pipelineModel.supportServable());
+ }
+
+ @Test
+ public void testLoadServable() throws Exception {
+ SumModel modelA = new SumModel().setModelData(tEnv.fromValues(10));
+ SumModel modelB = new SumModel().setModelData(tEnv.fromValues(20));
+ SumModel modelC = new SumModel().setModelData(tEnv.fromValues(30));
+
+ List<Stage<?>> stages = Arrays.asList(modelA, modelB, modelC);
+ Model<?> model = new PipelineModel(stages);
+
+ PipelineModelServable servable =
+ saveAndLoadServable(tEnv, model,
tempFolder.newFolder().getAbsolutePath());
Review Comment:
I agree with the solution and I've updated the PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]