http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/composition/stacking/StackedDatasetTrainer.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/composition/stacking/StackedDatasetTrainer.java b/modules/ml/src/main/java/org/apache/ignite/ml/composition/stacking/StackedDatasetTrainer.java index bb870cf..e58107d 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/composition/stacking/StackedDatasetTrainer.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/composition/stacking/StackedDatasetTrainer.java @@ -20,7 +20,7 @@ package org.apache.ignite.ml.composition.stacking; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; -import org.apache.ignite.ml.Model; +import org.apache.ignite.ml.IgniteModel; import org.apache.ignite.ml.dataset.DatasetBuilder; import org.apache.ignite.ml.environment.LearningEnvironmentBuilder; import org.apache.ignite.ml.environment.parallelism.Promise; @@ -53,7 +53,7 @@ import org.apache.ignite.ml.trainers.DatasetTrainer; * @param <O> Type of aggregator output. * @param <L> Type of labels. */ -public class StackedDatasetTrainer<IS, IA, O, AM extends Model<IA, O>, L> +public class StackedDatasetTrainer<IS, IA, O, AM extends IgniteModel<IA, O>, L> extends DatasetTrainer<StackedModel<IS, IA, O, AM>, L> { /** Operator that merges inputs for aggregating model. */ private IgniteBinaryOperator<IA> aggregatingInputMerger; @@ -62,7 +62,7 @@ public class StackedDatasetTrainer<IS, IA, O, AM extends Model<IA, O>, L> private IgniteFunction<IS, IA> submodelInput2AggregatingInputConverter; /** Trainers of submodels with converters from and to {@link Vector}. */ - private List<DatasetTrainer<Model<IS, IA>, L>> submodelsTrainers; + private List<DatasetTrainer<IgniteModel<IS, IA>, L>> submodelsTrainers; /** Aggregating trainer. */ private DatasetTrainer<AM, L> aggregatorTrainer; @@ -86,7 +86,7 @@ public class StackedDatasetTrainer<IS, IA, O, AM extends Model<IA, O>, L> public StackedDatasetTrainer(DatasetTrainer<AM, L> aggregatorTrainer, IgniteBinaryOperator<IA> aggregatingInputMerger, IgniteFunction<IS, IA> submodelInput2AggregatingInputConverter, - List<DatasetTrainer<Model<IS, IA>, L>> submodelsTrainers, + List<DatasetTrainer<IgniteModel<IS, IA>, L>> submodelsTrainers, IgniteFunction<Vector, IS> vector2SubmodelInputConverter, IgniteFunction<IA, Vector> submodelOutput2VectorConverter) { this.aggregatorTrainer = aggregatorTrainer; @@ -215,32 +215,32 @@ public class StackedDatasetTrainer<IS, IA, O, AM extends Model<IA, O>, L> * @return This object. */ @SuppressWarnings({"unchecked"}) - public <M1 extends Model<IS, IA>> StackedDatasetTrainer<IS, IA, O, AM, L> addTrainer( + public <M1 extends IgniteModel<IS, IA>> StackedDatasetTrainer<IS, IA, O, AM, L> addTrainer( DatasetTrainer<M1, L> trainer) { // Unsafely coerce DatasetTrainer<M1, L> to DatasetTrainer<Model<IS, IA>, L>, but we fully control // usages of this unsafely coerced object, on the other hand this makes work with // submodelTrainers easier. - submodelsTrainers.add(new DatasetTrainer<Model<IS, IA>, L>() { + submodelsTrainers.add(new DatasetTrainer<IgniteModel<IS, IA>, L>() { /** {@inheritDoc} */ - @Override public <K, V> Model<IS, IA> fit(DatasetBuilder<K, V> datasetBuilder, + @Override public <K, V> IgniteModel<IS, IA> fit(DatasetBuilder<K, V> datasetBuilder, IgniteBiFunction<K, V, Vector> featureExtractor, IgniteBiFunction<K, V, L> lbExtractor) { return trainer.fit(datasetBuilder, featureExtractor, lbExtractor); } /** {@inheritDoc} */ - @Override public <K, V> Model<IS, IA> update(Model<IS, IA> mdl, DatasetBuilder<K, V> datasetBuilder, + @Override public <K, V> IgniteModel<IS, IA> update(IgniteModel<IS, IA> mdl, DatasetBuilder<K, V> datasetBuilder, IgniteBiFunction<K, V, Vector> featureExtractor, IgniteBiFunction<K, V, L> lbExtractor) { - DatasetTrainer<Model<IS, IA>, L> trainer1 = (DatasetTrainer<Model<IS, IA>, L>)trainer; + DatasetTrainer<IgniteModel<IS, IA>, L> trainer1 = (DatasetTrainer<IgniteModel<IS, IA>, L>)trainer; return trainer1.update(mdl, datasetBuilder, featureExtractor, lbExtractor); } /** {@inheritDoc} */ - @Override protected boolean checkState(Model<IS, IA> mdl) { + @Override protected boolean checkState(IgniteModel<IS, IA> mdl) { return true; } /** {@inheritDoc} */ - @Override protected <K, V> Model<IS, IA> updateModel(Model<IS, IA> mdl, DatasetBuilder<K, V> datasetBuilder, + @Override protected <K, V> IgniteModel<IS, IA> updateModel(IgniteModel<IS, IA> mdl, DatasetBuilder<K, V> datasetBuilder, IgniteBiFunction<K, V, Vector> featureExtractor, IgniteBiFunction<K, V, L> lbExtractor) { return null; } @@ -263,11 +263,11 @@ public class StackedDatasetTrainer<IS, IA, O, AM extends Model<IA, O>, L> IgniteBiFunction<K, V, L> lbExtractor) { return runOnSubmodels( ensemble -> { - List<IgniteSupplier<Model<IS, IA>>> res = new ArrayList<>(); + List<IgniteSupplier<IgniteModel<IS, IA>>> res = new ArrayList<>(); for (int i = 0; i < ensemble.size(); i++) { final int j = i; res.add(() -> { - DatasetTrainer<Model<IS, IA>, L> trainer = ensemble.get(j); + DatasetTrainer<IgniteModel<IS, IA>, L> trainer = ensemble.get(j); return mdl == null ? trainer.fit(datasetBuilder, featureExtractor, lbExtractor) : trainer.update(mdl.submodels().get(j), datasetBuilder, featureExtractor, lbExtractor); @@ -306,7 +306,7 @@ public class StackedDatasetTrainer<IS, IA, O, AM extends Model<IA, O>, L> * @return {@link StackedModel}. */ private <K, V> StackedModel<IS, IA, O, AM> runOnSubmodels( - IgniteFunction<List<DatasetTrainer<Model<IS, IA>, L>>, List<IgniteSupplier<Model<IS, IA>>>> taskSupplier, + IgniteFunction<List<DatasetTrainer<IgniteModel<IS, IA>, L>>, List<IgniteSupplier<IgniteModel<IS, IA>>>> taskSupplier, IgniteBiFunction<DatasetTrainer<AM, L>, IgniteBiFunction<K, V, Vector>, AM> aggregatorProcessor, IgniteBiFunction<K, V, Vector> featureExtractor) { @@ -322,9 +322,9 @@ public class StackedDatasetTrainer<IS, IA, O, AM extends Model<IA, O>, L> if (aggregatingInputMerger == null) throw new IllegalStateException("Binary operator used to convert outputs of submodels is not specified"); - List<IgniteSupplier<Model<IS, IA>>> mdlSuppliers = taskSupplier.apply(submodelsTrainers); + List<IgniteSupplier<IgniteModel<IS, IA>>> mdlSuppliers = taskSupplier.apply(submodelsTrainers); - List<Model<IS, IA>> subMdls = environment.parallelismStrategy().submit(mdlSuppliers).stream() + List<IgniteModel<IS, IA>> subMdls = environment.parallelismStrategy().submit(mdlSuppliers).stream() .map(Promise::unsafeGet) .collect(Collectors.toList()); @@ -342,7 +342,7 @@ public class StackedDatasetTrainer<IS, IA, O, AM extends Model<IA, O>, L> aggregatingInputMerger, submodelInput2AggregatingInputConverter); - for (Model<IS, IA> subMdl : subMdls) + for (IgniteModel<IS, IA> subMdl : subMdls) res.addSubmodel(subMdl); return res; @@ -359,7 +359,7 @@ public class StackedDatasetTrainer<IS, IA, O, AM extends Model<IA, O>, L> * @return Feature extractor which will be used for aggregator trainer from original feature extractor. */ private static <IS, IA, K, V> IgniteBiFunction<K, V, Vector> getFeatureExtractorForAggregator( - IgniteBiFunction<K, V, Vector> featureExtractor, List<Model<IS, IA>> subMdls, + IgniteBiFunction<K, V, Vector> featureExtractor, List<IgniteModel<IS, IA>> subMdls, IgniteFunction<IS, IA> submodelInput2AggregatingInputConverter, IgniteFunction<IA, Vector> submodelOutput2VectorConverter, IgniteFunction<Vector, IS> vector2SubmodelInputConverter) { @@ -389,11 +389,11 @@ public class StackedDatasetTrainer<IS, IA, O, AM extends Model<IA, O>, L> * @return Result of application of {@code submodelOutput2VectorConverter . mdl . vector2SubmodelInputConverter} * where dot denotes functions composition. */ - private static <IS, IA> Vector applyToVector(Model<IS, IA> mdl, + private static <IS, IA> Vector applyToVector(IgniteModel<IS, IA> mdl, IgniteFunction<IA, Vector> submodelOutput2VectorConverter, IgniteFunction<Vector, IS> vector2SubmodelInputConverter, Vector v) { - return vector2SubmodelInputConverter.andThen(mdl).andThen(submodelOutput2VectorConverter).apply(v); + return vector2SubmodelInputConverter.andThen(mdl::predict).andThen(submodelOutput2VectorConverter).apply(v); } /** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/composition/stacking/StackedModel.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/composition/stacking/StackedModel.java b/modules/ml/src/main/java/org/apache/ignite/ml/composition/stacking/StackedModel.java index cb64d01..a9be8f8 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/composition/stacking/StackedModel.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/composition/stacking/StackedModel.java @@ -19,7 +19,7 @@ package org.apache.ignite.ml.composition.stacking; import java.util.ArrayList; import java.util.List; -import org.apache.ignite.ml.Model; +import org.apache.ignite.ml.IgniteModel; import org.apache.ignite.ml.math.functions.IgniteBinaryOperator; import org.apache.ignite.ml.math.functions.IgniteFunction; @@ -40,15 +40,15 @@ import org.apache.ignite.ml.math.functions.IgniteFunction; * @param <O> Type of aggregator model output. * @param <AM> Type of aggregator model. */ -public class StackedModel<IS, IA, O, AM extends Model<IA, O>> implements Model<IS, O> { +public class StackedModel<IS, IA, O, AM extends IgniteModel<IA, O>> implements IgniteModel<IS, O> { /** Submodels layer. */ - private Model<IS, IA> subModelsLayer; + private IgniteModel<IS, IA> subModelsLayer; /** Aggregator model. */ private final AM aggregatorMdl; /** Models constituting submodels layer. */ - private List<Model<IS, IA>> submodels; + private List<IgniteModel<IS, IA>> submodels; /** Binary operator merging submodels outputs. */ private final IgniteBinaryOperator<IA> aggregatingInputMerger; @@ -75,7 +75,7 @@ public class StackedModel<IS, IA, O, AM extends Model<IA, O>> implements Model<I * * @return Submodels constituting first layer of this model. */ - List<Model<IS, IA>> submodels() { + List<IgniteModel<IS, IA>> submodels() { return submodels; } @@ -93,14 +93,14 @@ public class StackedModel<IS, IA, O, AM extends Model<IA, O>> implements Model<I * * @param subMdl Submodel to add. */ - void addSubmodel(Model<IS, IA> subMdl) { + void addSubmodel(IgniteModel<IS, IA> subMdl) { submodels.add(subMdl); subModelsLayer = subModelsLayer != null ? subModelsLayer.combine(subMdl, aggregatingInputMerger) : subMdl; } /** {@inheritDoc} */ - @Override public O apply(IS is) { - return subModelsLayer.andThen(aggregatorMdl).apply(is); + @Override public O predict(IS is) { + return subModelsLayer.andThen(aggregatorMdl).predict(is); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/composition/stacking/StackedVectorDatasetTrainer.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/composition/stacking/StackedVectorDatasetTrainer.java b/modules/ml/src/main/java/org/apache/ignite/ml/composition/stacking/StackedVectorDatasetTrainer.java index 16eaec2..7230e3c 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/composition/stacking/StackedVectorDatasetTrainer.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/composition/stacking/StackedVectorDatasetTrainer.java @@ -17,7 +17,7 @@ package org.apache.ignite.ml.composition.stacking; -import org.apache.ignite.ml.Model; +import org.apache.ignite.ml.IgniteModel; import org.apache.ignite.ml.environment.LearningEnvironmentBuilder; import org.apache.ignite.ml.math.functions.IgniteBinaryOperator; import org.apache.ignite.ml.math.functions.IgniteFunction; @@ -35,7 +35,7 @@ import org.apache.ignite.ml.trainers.DatasetTrainer; * @param <L> Type of labels. * @param <AM> Type of aggregator model. */ -public class StackedVectorDatasetTrainer<O, AM extends Model<Vector, O>, L> +public class StackedVectorDatasetTrainer<O, AM extends IgniteModel<Vector, O>, L> extends SimpleStackedDatasetTrainer<Vector, O, AM, L> { /** * Constructs instance of this class. @@ -58,7 +58,7 @@ public class StackedVectorDatasetTrainer<O, AM extends Model<Vector, O>, L> } /** {@inheritDoc} */ - @Override public <M1 extends Model<Vector, Vector>> StackedVectorDatasetTrainer<O, AM, L> addTrainer( + @Override public <M1 extends IgniteModel<Vector, Vector>> StackedVectorDatasetTrainer<O, AM, L> addTrainer( DatasetTrainer<M1, L> trainer) { return (StackedVectorDatasetTrainer<O, AM, L>)super.addTrainer(trainer); } @@ -127,7 +127,7 @@ public class StackedVectorDatasetTrainer<O, AM extends Model<Vector, O>, L> * @param <M1> Type of submodel trainer model. * @return This object. */ - public <M1 extends Model<Vector, Double>> StackedVectorDatasetTrainer<O, AM, L> addTrainerWithDoubleOutput( + public <M1 extends IgniteModel<Vector, Double>> StackedVectorDatasetTrainer<O, AM, L> addTrainerWithDoubleOutput( DatasetTrainer<M1, L> trainer) { return addTrainer(AdaptableDatasetTrainer.of(trainer).afterTrainedModel(VectorUtils::num2Vec)); } @@ -140,7 +140,7 @@ public class StackedVectorDatasetTrainer<O, AM extends Model<Vector, O>, L> * @param <M1> Type of submodel trainer model. * @return This object. */ - public <M1 extends Model<Matrix, Matrix>> StackedVectorDatasetTrainer<O, AM, L> addMatrix2MatrixTrainer( + public <M1 extends IgniteModel<Matrix, Matrix>> StackedVectorDatasetTrainer<O, AM, L> addMatrix2MatrixTrainer( DatasetTrainer<M1, L> trainer) { AdaptableDatasetTrainer<Vector, Vector, Matrix, Matrix, M1, L> adapted = AdaptableDatasetTrainer.of(trainer) .beforeTrainedModel((Vector v) -> new DenseMatrix(v.asArray(), 1)) http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/ConsoleLogger.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/ConsoleLogger.java b/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/ConsoleLogger.java index c124e06..940d8cf 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/ConsoleLogger.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/ConsoleLogger.java @@ -17,7 +17,7 @@ package org.apache.ignite.ml.environment.logging; -import org.apache.ignite.ml.Model; +import org.apache.ignite.ml.IgniteModel; import org.apache.ignite.ml.math.Tracer; import org.apache.ignite.ml.math.primitives.vector.Vector; @@ -57,7 +57,7 @@ public class ConsoleLogger implements MLLogger { } /** {@inheritDoc} */ - @Override public <K, V> Model<K, V> log(VerboseLevel verboseLevel, Model<K, V> mdl) { + @Override public <K, V> IgniteModel<K, V> log(VerboseLevel verboseLevel, IgniteModel<K, V> mdl) { print(verboseLevel, mdl.toString(true)); return mdl; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/CustomMLLogger.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/CustomMLLogger.java b/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/CustomMLLogger.java index 90aed14..a644d0c 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/CustomMLLogger.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/CustomMLLogger.java @@ -18,7 +18,7 @@ package org.apache.ignite.ml.environment.logging; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.ml.Model; +import org.apache.ignite.ml.IgniteModel; import org.apache.ignite.ml.math.Tracer; import org.apache.ignite.ml.math.primitives.vector.Vector; @@ -54,7 +54,7 @@ public class CustomMLLogger implements MLLogger { } /** {@inheritDoc} */ - @Override public <K, V> Model<K, V> log(VerboseLevel verboseLevel, Model<K, V> mdl) { + @Override public <K, V> IgniteModel<K, V> log(VerboseLevel verboseLevel, IgniteModel<K, V> mdl) { log(verboseLevel, mdl.toString(true)); return mdl; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/MLLogger.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/MLLogger.java b/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/MLLogger.java index b2b4739..3c12ba0 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/MLLogger.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/MLLogger.java @@ -17,7 +17,7 @@ package org.apache.ignite.ml.environment.logging; -import org.apache.ignite.ml.Model; +import org.apache.ignite.ml.IgniteModel; import org.apache.ignite.ml.math.primitives.vector.Vector; /** @@ -51,7 +51,7 @@ public interface MLLogger { * @param verboseLevel Verbose level. * @param mdl Model. */ - public <K, V> Model<K,V> log(VerboseLevel verboseLevel, Model<K, V> mdl); + public <K, V> IgniteModel<K,V> log(VerboseLevel verboseLevel, IgniteModel<K, V> mdl); /** * Log line with formatting. http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/NoOpLogger.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/NoOpLogger.java b/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/NoOpLogger.java index 729604f..7e73d62 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/NoOpLogger.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/environment/logging/NoOpLogger.java @@ -17,7 +17,7 @@ package org.apache.ignite.ml.environment.logging; -import org.apache.ignite.ml.Model; +import org.apache.ignite.ml.IgniteModel; import org.apache.ignite.ml.math.primitives.vector.Vector; /** @@ -40,7 +40,7 @@ public class NoOpLogger implements MLLogger { } /** {@inheritDoc} */ - @Override public <K, V> Model<K, V> log(VerboseLevel verboseLevel, Model<K, V> mdl) { + @Override public <K, V> IgniteModel<K, V> log(VerboseLevel verboseLevel, IgniteModel<K, V> mdl) { return mdl; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/inference/InfModel.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/InfModel.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/InfModel.java deleted file mode 100644 index c5b95e4..0000000 --- a/modules/ml/src/main/java/org/apache/ignite/ml/inference/InfModel.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.ml.inference; - -import java.util.function.Function; - -/** - * Inference model that can be used to make predictions. - * - * @param <I> Type of model input. - * @param <O> Type of model output. - */ -public interface InfModel<I, O> extends Function<I, O>, AutoCloseable { - /** - * Make a prediction for the specified input arguments. - * - * @param input Input arguments. - * @return Prediction result. - */ - public O apply(I input); - - /** {@inheritDoc} */ - public void close(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/inference/Model.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/Model.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/Model.java new file mode 100644 index 0000000..74233ee --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/Model.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.inference; + +/** + * Inference model that can be used to make predictions. + * + * @param <I> Type of model input. + * @param <O> Type of model output. + */ +public interface Model<I, O> extends AutoCloseable { + /** + * Make a prediction for the specified input arguments. + * + * @param input Input arguments. + * @return Prediction result. + */ + public O predict(I input); + + /** {@inheritDoc} */ + public void close(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/inference/ModelDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/ModelDescriptor.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/ModelDescriptor.java index 8c8980c..7034cf9 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/inference/ModelDescriptor.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/ModelDescriptor.java @@ -18,11 +18,11 @@ package org.apache.ignite.ml.inference; import java.io.Serializable; -import org.apache.ignite.ml.inference.parser.InfModelParser; -import org.apache.ignite.ml.inference.reader.InfModelReader; +import org.apache.ignite.ml.inference.parser.ModelParser; +import org.apache.ignite.ml.inference.reader.ModelReader; /** - * Model descriptor that encapsulates information about model, {@link InfModelReader} and {@link InfModelParser} which + * Model descriptor that encapsulates information about model, {@link ModelReader} and {@link ModelParser} which * is required to build the model. */ public class ModelDescriptor implements Serializable { @@ -36,10 +36,10 @@ public class ModelDescriptor implements Serializable { private final ModelSignature signature; /** Model reader. */ - private final InfModelReader reader; + private final ModelReader reader; /** Model parser. */ - private final InfModelParser<byte[], byte[], ?> parser; + private final ModelParser<byte[], byte[], ?> parser; /** * Constructs a new instance of model descriptor. @@ -50,8 +50,8 @@ public class ModelDescriptor implements Serializable { * @param reader Model reader. * @param parser Model parser. */ - public ModelDescriptor(String name, String desc, ModelSignature signature, InfModelReader reader, - InfModelParser<byte[], byte[], ?> parser) { + public ModelDescriptor(String name, String desc, ModelSignature signature, ModelReader reader, + ModelParser<byte[], byte[], ?> parser) { this.name = name; this.desc = desc; this.signature = signature; @@ -75,12 +75,12 @@ public class ModelDescriptor implements Serializable { } /** */ - public InfModelReader getReader() { + public ModelReader getReader() { return reader; } /** */ - public InfModelParser<byte[], byte[], ?> getParser() { + public ModelParser<byte[], byte[], ?> getParser() { return parser; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/AsyncInfModelBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/AsyncInfModelBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/AsyncInfModelBuilder.java deleted file mode 100644 index e8b7e86..0000000 --- a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/AsyncInfModelBuilder.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.ml.inference.builder; - -import java.io.Serializable; -import java.util.concurrent.Future; -import org.apache.ignite.ml.inference.InfModel; -import org.apache.ignite.ml.inference.parser.InfModelParser; -import org.apache.ignite.ml.inference.reader.InfModelReader; - -/** - * Builder of asynchronous inference model. Uses specified model reader (see {@link InfModelReader}) and mode parser - * (see {@link InfModelParser}) to build a model. - */ -@FunctionalInterface -public interface AsyncInfModelBuilder { - /** - * Builds asynchronous inference model using specified model reader and model parser. - * - * @param reader Model reader. - * @param parser Model parser. - * @param <I> Type of model input. - * @param <O> Type of model output. - * @return Inference model. - */ - public <I extends Serializable, O extends Serializable> InfModel<I, Future<O>> build(InfModelReader reader, - InfModelParser<I, O, ?> parser); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/AsyncModelBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/AsyncModelBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/AsyncModelBuilder.java new file mode 100644 index 0000000..2537192 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/AsyncModelBuilder.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.inference.builder; + +import java.io.Serializable; +import java.util.concurrent.Future; +import org.apache.ignite.ml.inference.Model; +import org.apache.ignite.ml.inference.parser.ModelParser; +import org.apache.ignite.ml.inference.reader.ModelReader; + +/** + * Builder of asynchronous inference model. Uses specified model reader (see {@link ModelReader}) and mode parser + * (see {@link ModelParser}) to build a model. + */ +@FunctionalInterface +public interface AsyncModelBuilder { + /** + * Builds asynchronous inference model using specified model reader and model parser. + * + * @param reader Model reader. + * @param parser Model parser. + * @param <I> Type of model input. + * @param <O> Type of model output. + * @return Inference model. + */ + public <I extends Serializable, O extends Serializable> Model<I, Future<O>> build(ModelReader reader, + ModelParser<I, O, ?> parser); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/IgniteDistributedInfModelBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/IgniteDistributedInfModelBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/IgniteDistributedInfModelBuilder.java deleted file mode 100644 index 8347b7c..0000000 --- a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/IgniteDistributedInfModelBuilder.java +++ /dev/null @@ -1,368 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.ml.inference.builder; - -import java.io.Serializable; -import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteQueue; -import org.apache.ignite.Ignition; -import org.apache.ignite.configuration.CollectionConfiguration; -import org.apache.ignite.ml.inference.InfModel; -import org.apache.ignite.ml.inference.parser.InfModelParser; -import org.apache.ignite.ml.inference.reader.InfModelReader; -import org.apache.ignite.services.Service; -import org.apache.ignite.services.ServiceContext; - -/** - * Builder that allows to start Apache Ignite services for distributed inference and get a facade that allows to work - * with this distributed inference infrastructure as with a single inference model (see {@link InfModel}). - * - * The common workflow is based on a request/response queues and multiple workers represented by Apache Ignite services. - * When the {@link #build(InfModelReader, InfModelParser)} method is called Apache Ignite starts the specified number of - * service instances and request/response queues. Each service instance reads request queue, processes inbound requests - * and writes responses to response queue. The facade returned by the {@link #build(InfModelReader, InfModelParser)} - * method operates with request/response queues. When the {@link InfModel#apply(Object)} method is called the argument - * is sent as a request to the request queue. When the response is appeared in the response queue the {@link Future} - * correspondent to the previously sent request is completed and the processing finishes. - * - * Be aware that {@link InfModel#close()} method must be called to clear allocated resources, stop services and remove - * queues. - */ -public class IgniteDistributedInfModelBuilder implements AsyncInfModelBuilder { - /** Template of the inference service name. */ - private static final String INFERENCE_SERVICE_NAME_PATTERN = "inference_service_%s"; - - /** Template of the inference request queue name. */ - private static final String INFERENCE_REQUEST_QUEUE_NAME_PATTERN = "inference_queue_req_%s"; - - /** Template of the inference response queue name. */ - private static final String INFERENCE_RESPONSE_QUEUE_NAME_PATTERN = "inference_queue_res_%s"; - - /** Default capacity for all queues used in this class (request queue, response queue, received queue). */ - private static final int QUEUE_CAPACITY = 100; - - /** Default configuration for Apache Ignite queues used in this class (request queue, response queue). */ - private static final CollectionConfiguration queueCfg = new CollectionConfiguration(); - - /** Ignite instance. */ - private final Ignite ignite; - - /** Number of service instances maintaining to make distributed inference. */ - private final int instances; - - /** Max per node number of instances. */ - private final int maxPerNode; - - /** - * Constructs a new instance of Ignite distributed inference model builder. - * - * @param ignite Ignite instance. - * @param instances Number of service instances maintaining to make distributed inference. - * @param maxPerNode Max per node number of instances. - */ - public IgniteDistributedInfModelBuilder(Ignite ignite, int instances, int maxPerNode) { - this.ignite = ignite; - this.instances = instances; - this.maxPerNode = maxPerNode; - } - - /** - * Starts the specified in constructor number of service instances and request/response queues. Each service - * instance reads request queue, processes inbound requests and writes responses to response queue. The returned - * facade is represented by the {@link InfModel} operates with request/response queues, but hides these details - * behind {@link InfModel#apply(Object)} method of {@link InfModel}. - * - * Be aware that {@link InfModel#close()} method must be called to clear allocated resources, stop services and - * remove queues. - * - * @param reader Inference model reader. - * @param parser Inference model parser. - * @param <I> Type of model input. - * @param <O> Type of model output. - * @return Facade represented by {@link InfModel}. - */ - @Override public <I extends Serializable, O extends Serializable> InfModel<I, Future<O>> build( - InfModelReader reader, InfModelParser<I, O, ?> parser) { - return new DistributedInfModel<>(ignite, UUID.randomUUID().toString(), reader, parser, instances, maxPerNode); - } - - /** - * Facade that operates with request/response queues to make distributed inference, but hides these details - * behind {@link InfModel#apply(Object)} method of {@link InfModel}. - * - * Be aware that {@link InfModel#close()} method must be called to clear allocated resources, stop services and - * remove queues. - * - * @param <I> Type of model input. - * @param <O> Type of model output. - */ - private static class DistributedInfModel<I extends Serializable, O extends Serializable> - implements InfModel<I, Future<O>> { - /** Ignite instance. */ - private final Ignite ignite; - - /** Suffix that with correspondent templates formats service and queue names. */ - private final String suffix; - - /** Request queue. */ - private final IgniteQueue<I> reqQueue; - - /** Response queue. */ - private final IgniteQueue<O> resQueue; - - /** Futures that represents requests that have been sent, but haven't been responded yet. */ - private final BlockingQueue<CompletableFuture<O>> futures = new ArrayBlockingQueue<>(QUEUE_CAPACITY); - - /** Thread pool for receiver to work in. */ - private final ExecutorService receiverThreadPool = Executors.newSingleThreadExecutor(); - - /** Flag identified that model is up and running. */ - private final AtomicBoolean running = new AtomicBoolean(false); - - /** Receiver future. */ - private volatile Future<?> receiverFut; - - /** - * Constructs a new instance of distributed inference model. - * - * @param ignite Ignite instance. - * @param suffix Suffix that with correspondent templates formats service and queue names. - * @param reader Inference model reader. - * @param parser Inference model parser. - * @param instances Number of service instances maintaining to make distributed inference. - * @param maxPerNode Max per node number of instances. - */ - DistributedInfModel(Ignite ignite, String suffix, InfModelReader reader, InfModelParser<I, O, ?> parser, - int instances, int maxPerNode) { - this.ignite = ignite; - this.suffix = suffix; - - reqQueue = ignite.queue(String.format(INFERENCE_REQUEST_QUEUE_NAME_PATTERN, suffix), QUEUE_CAPACITY, - queueCfg); - resQueue = ignite.queue(String.format(INFERENCE_RESPONSE_QUEUE_NAME_PATTERN, suffix), QUEUE_CAPACITY, - queueCfg); - - startReceiver(); - startService(reader, parser, instances, maxPerNode); - - running.set(true); - } - - /** {@inheritDoc} */ - @Override public Future<O> apply(I input) { - if (!running.get()) - throw new IllegalStateException("Inference model is not running"); - - CompletableFuture<O> fut = new CompletableFuture<>(); - - try { - futures.put(fut); - } - catch (InterruptedException e) { - close(); // In case of exception in the above code the model state becomes invalid and model is closed. - throw new RuntimeException(e); - } - - reqQueue.put(input); - return fut; - } - - /** - * Starts Apache Ignite services that represent distributed inference infrastructure. - * - * @param reader Inference model reader. - * @param parser Inference model parser. - * @param instances Number of service instances maintaining to make distributed inference. - * @param maxPerNode Max per node number of instances. - */ - private void startService(InfModelReader reader, InfModelParser<I, O, ?> parser, int instances, int maxPerNode) { - ignite.services().deployMultiple( - String.format(INFERENCE_SERVICE_NAME_PATTERN, suffix), - new IgniteDistributedInfModelService<>(reader, parser, suffix), - instances, - maxPerNode - ); - } - - /** - * Stops Apache Ignite services that represent distributed inference infrastructure. - */ - private void stopService() { - ignite.services().cancel(String.format(INFERENCE_SERVICE_NAME_PATTERN, suffix)); - } - - /** - * Starts the thread that reads the response queue and completed correspondent futures from {@link #futures} - * queue. - */ - private void startReceiver() { - receiverFut = receiverThreadPool.submit(() -> { - try { - while (!Thread.currentThread().isInterrupted()) { - O res; - try { - res = resQueue.take(); - } - catch (IllegalStateException e) { - if (!resQueue.removed()) - throw e; - continue; - } - - CompletableFuture<O> fut = futures.remove(); - fut.complete(res); - } - } - finally { - close(); // If the model is not stopped yet we need to stop it to protect queue from new writes. - while (!futures.isEmpty()) { - CompletableFuture<O> fut = futures.remove(); - fut.cancel(true); - } - } - }); - } - - /** - * Stops receiver thread that reads the response queue and completed correspondent futures from - * {@link #futures} queue. - */ - private void stopReceiver() { - if (receiverFut != null && !receiverFut.isDone()) - receiverFut.cancel(true); - // The receiver thread pool is not reused, so it should be closed here. - receiverThreadPool.shutdown(); - } - - /** - * Remove request/response Ignite queues. - */ - private void removeQueues() { - reqQueue.close(); - resQueue.close(); - } - - /** {@inheritDoc} */ - @Override public void close() { - boolean runningBefore = running.getAndSet(false); - - if (runningBefore) { - stopService(); - stopReceiver(); - removeQueues(); - } - } - } - - /** - * Apache Ignite service that makes inference reading requests from the request queue and writing responses to the - * response queue. This service is assumed to be deployed in {@link #build(InfModelReader, InfModelParser)} method - * and cancelled in {@link InfModel#close()} method of the inference model. - * - * @param <I> Type of model input. - * @param <O> Type of model output. - */ - private static class IgniteDistributedInfModelService<I extends Serializable, O extends Serializable> - implements Service { - /** */ - private static final long serialVersionUID = -3596084917874395597L; - - /** Inference model reader. */ - private final InfModelReader reader; - - /** Inference model parser. */ - private final InfModelParser<I, O, ?> parser; - - /** Suffix that with correspondent templates formats service and queue names. */ - private final String suffix; - - /** Request queue, is created in {@link #init(ServiceContext)} method. */ - private transient IgniteQueue<I> reqQueue; - - /** Response queue, is created in {@link #init(ServiceContext)} method. */ - private transient IgniteQueue<O> resQueue; - - /** Inference model, is created in {@link #init(ServiceContext)} method. */ - private transient InfModel<I, O> mdl; - - /** - * Constructs a new instance of Ignite distributed inference model service. - * - * @param reader Inference model reader. - * @param parser Inference model parser. - * @param suffix Suffix that with correspondent templates formats service and queue names. - */ - IgniteDistributedInfModelService(InfModelReader reader, InfModelParser<I, O, ?> parser, String suffix) { - this.reader = reader; - this.parser = parser; - this.suffix = suffix; - } - - /** {@inheritDoc} */ - @Override public void init(ServiceContext ctx) { - Ignite ignite = Ignition.localIgnite(); - - reqQueue = ignite.queue(String.format(INFERENCE_REQUEST_QUEUE_NAME_PATTERN, suffix), QUEUE_CAPACITY, - queueCfg); - resQueue = ignite.queue(String.format(INFERENCE_RESPONSE_QUEUE_NAME_PATTERN, suffix), QUEUE_CAPACITY, - queueCfg); - - mdl = parser.parse(reader.read()); - } - - /** {@inheritDoc} */ - @Override public void execute(ServiceContext ctx) { - while (!ctx.isCancelled()) { - I req; - try { - req = reqQueue.take(); - } - catch (IllegalStateException e) { - // If the queue is removed during the take() operation exception should be ignored. - if (!reqQueue.removed()) - throw e; - continue; - } - - O res = mdl.apply(req); - - try { - resQueue.put(res); - } - catch (IllegalStateException e) { - // If the queue is removed during the put() operation exception should be ignored. - if (!resQueue.removed()) - throw e; - } - } - } - - /** {@inheritDoc} */ - @Override public void cancel(ServiceContext ctx) { - // Do nothing. Queues are assumed to be closed in model close() method. - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/IgniteDistributedModelBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/IgniteDistributedModelBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/IgniteDistributedModelBuilder.java new file mode 100644 index 0000000..1444ba7 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/IgniteDistributedModelBuilder.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.inference.builder; + +import java.io.Serializable; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteQueue; +import org.apache.ignite.Ignition; +import org.apache.ignite.configuration.CollectionConfiguration; +import org.apache.ignite.ml.inference.Model; +import org.apache.ignite.ml.inference.parser.ModelParser; +import org.apache.ignite.ml.inference.reader.ModelReader; +import org.apache.ignite.services.Service; +import org.apache.ignite.services.ServiceContext; + +/** + * Builder that allows to start Apache Ignite services for distributed inference and get a facade that allows to work + * with this distributed inference infrastructure as with a single inference model (see {@link Model}). + * + * The common workflow is based on a request/response queues and multiple workers represented by Apache Ignite services. + * When the {@link #build(ModelReader, ModelParser)} method is called Apache Ignite starts the specified number of + * service instances and request/response queues. Each service instance reads request queue, processes inbound requests + * and writes responses to response queue. The facade returned by the {@link #build(ModelReader, ModelParser)} + * method operates with request/response queues. When the {@link Model#predict(Object)} method is called the argument + * is sent as a request to the request queue. When the response is appeared in the response queue the {@link Future} + * correspondent to the previously sent request is completed and the processing finishes. + * + * Be aware that {@link Model#close()} method must be called to clear allocated resources, stop services and remove + * queues. + */ +public class IgniteDistributedModelBuilder implements AsyncModelBuilder { + /** Template of the inference service name. */ + private static final String INFERENCE_SERVICE_NAME_PATTERN = "inference_service_%s"; + + /** Template of the inference request queue name. */ + private static final String INFERENCE_REQUEST_QUEUE_NAME_PATTERN = "inference_queue_req_%s"; + + /** Template of the inference response queue name. */ + private static final String INFERENCE_RESPONSE_QUEUE_NAME_PATTERN = "inference_queue_res_%s"; + + /** Default capacity for all queues used in this class (request queue, response queue, received queue). */ + private static final int QUEUE_CAPACITY = 100; + + /** Default configuration for Apache Ignite queues used in this class (request queue, response queue). */ + private static final CollectionConfiguration queueCfg = new CollectionConfiguration(); + + /** Ignite instance. */ + private final Ignite ignite; + + /** Number of service instances maintaining to make distributed inference. */ + private final int instances; + + /** Max per node number of instances. */ + private final int maxPerNode; + + /** + * Constructs a new instance of Ignite distributed inference model builder. + * + * @param ignite Ignite instance. + * @param instances Number of service instances maintaining to make distributed inference. + * @param maxPerNode Max per node number of instances. + */ + public IgniteDistributedModelBuilder(Ignite ignite, int instances, int maxPerNode) { + this.ignite = ignite; + this.instances = instances; + this.maxPerNode = maxPerNode; + } + + /** + * Starts the specified in constructor number of service instances and request/response queues. Each service + * instance reads request queue, processes inbound requests and writes responses to response queue. The returned + * facade is represented by the {@link Model} operates with request/response queues, but hides these details + * behind {@link Model#predict(Object)} method of {@link Model}. + * + * Be aware that {@link Model#close()} method must be called to clear allocated resources, stop services and + * remove queues. + * + * @param reader Inference model reader. + * @param parser Inference model parser. + * @param <I> Type of model input. + * @param <O> Type of model output. + * @return Facade represented by {@link Model}. + */ + @Override public <I extends Serializable, O extends Serializable> Model<I, Future<O>> build( + ModelReader reader, ModelParser<I, O, ?> parser) { + return new DistributedInfModel<>(ignite, UUID.randomUUID().toString(), reader, parser, instances, maxPerNode); + } + + /** + * Facade that operates with request/response queues to make distributed inference, but hides these details + * behind {@link Model#predict(Object)} method of {@link Model}. + * + * Be aware that {@link Model#close()} method must be called to clear allocated resources, stop services and + * remove queues. + * + * @param <I> Type of model input. + * @param <O> Type of model output. + */ + private static class DistributedInfModel<I extends Serializable, O extends Serializable> + implements Model<I, Future<O>> { + /** Ignite instance. */ + private final Ignite ignite; + + /** Suffix that with correspondent templates formats service and queue names. */ + private final String suffix; + + /** Request queue. */ + private final IgniteQueue<I> reqQueue; + + /** Response queue. */ + private final IgniteQueue<O> resQueue; + + /** Futures that represents requests that have been sent, but haven't been responded yet. */ + private final BlockingQueue<CompletableFuture<O>> futures = new ArrayBlockingQueue<>(QUEUE_CAPACITY); + + /** Thread pool for receiver to work in. */ + private final ExecutorService receiverThreadPool = Executors.newSingleThreadExecutor(); + + /** Flag identified that model is up and running. */ + private final AtomicBoolean running = new AtomicBoolean(false); + + /** Receiver future. */ + private volatile Future<?> receiverFut; + + /** + * Constructs a new instance of distributed inference model. + * + * @param ignite Ignite instance. + * @param suffix Suffix that with correspondent templates formats service and queue names. + * @param reader Inference model reader. + * @param parser Inference model parser. + * @param instances Number of service instances maintaining to make distributed inference. + * @param maxPerNode Max per node number of instances. + */ + DistributedInfModel(Ignite ignite, String suffix, ModelReader reader, ModelParser<I, O, ?> parser, + int instances, int maxPerNode) { + this.ignite = ignite; + this.suffix = suffix; + + reqQueue = ignite.queue(String.format(INFERENCE_REQUEST_QUEUE_NAME_PATTERN, suffix), QUEUE_CAPACITY, + queueCfg); + resQueue = ignite.queue(String.format(INFERENCE_RESPONSE_QUEUE_NAME_PATTERN, suffix), QUEUE_CAPACITY, + queueCfg); + + startReceiver(); + startService(reader, parser, instances, maxPerNode); + + running.set(true); + } + + /** {@inheritDoc} */ + @Override public Future<O> predict(I input) { + if (!running.get()) + throw new IllegalStateException("Inference model is not running"); + + CompletableFuture<O> fut = new CompletableFuture<>(); + + try { + futures.put(fut); + } + catch (InterruptedException e) { + close(); // In case of exception in the above code the model state becomes invalid and model is closed. + throw new RuntimeException(e); + } + + reqQueue.put(input); + return fut; + } + + /** + * Starts Apache Ignite services that represent distributed inference infrastructure. + * + * @param reader Inference model reader. + * @param parser Inference model parser. + * @param instances Number of service instances maintaining to make distributed inference. + * @param maxPerNode Max per node number of instances. + */ + private void startService(ModelReader reader, ModelParser<I, O, ?> parser, int instances, int maxPerNode) { + ignite.services().deployMultiple( + String.format(INFERENCE_SERVICE_NAME_PATTERN, suffix), + new IgniteDistributedInfModelService<>(reader, parser, suffix), + instances, + maxPerNode + ); + } + + /** + * Stops Apache Ignite services that represent distributed inference infrastructure. + */ + private void stopService() { + ignite.services().cancel(String.format(INFERENCE_SERVICE_NAME_PATTERN, suffix)); + } + + /** + * Starts the thread that reads the response queue and completed correspondent futures from {@link #futures} + * queue. + */ + private void startReceiver() { + receiverFut = receiverThreadPool.submit(() -> { + try { + while (!Thread.currentThread().isInterrupted()) { + O res; + try { + res = resQueue.take(); + } + catch (IllegalStateException e) { + if (!resQueue.removed()) + throw e; + continue; + } + + CompletableFuture<O> fut = futures.remove(); + fut.complete(res); + } + } + finally { + close(); // If the model is not stopped yet we need to stop it to protect queue from new writes. + while (!futures.isEmpty()) { + CompletableFuture<O> fut = futures.remove(); + fut.cancel(true); + } + } + }); + } + + /** + * Stops receiver thread that reads the response queue and completed correspondent futures from + * {@link #futures} queue. + */ + private void stopReceiver() { + if (receiverFut != null && !receiverFut.isDone()) + receiverFut.cancel(true); + // The receiver thread pool is not reused, so it should be closed here. + receiverThreadPool.shutdown(); + } + + /** + * Remove request/response Ignite queues. + */ + private void removeQueues() { + reqQueue.close(); + resQueue.close(); + } + + /** {@inheritDoc} */ + @Override public void close() { + boolean runningBefore = running.getAndSet(false); + + if (runningBefore) { + stopService(); + stopReceiver(); + removeQueues(); + } + } + } + + /** + * Apache Ignite service that makes inference reading requests from the request queue and writing responses to the + * response queue. This service is assumed to be deployed in {@link #build(ModelReader, ModelParser)} method + * and cancelled in {@link Model#close()} method of the inference model. + * + * @param <I> Type of model input. + * @param <O> Type of model output. + */ + private static class IgniteDistributedInfModelService<I extends Serializable, O extends Serializable> + implements Service { + /** */ + private static final long serialVersionUID = -3596084917874395597L; + + /** Inference model reader. */ + private final ModelReader reader; + + /** Inference model parser. */ + private final ModelParser<I, O, ?> parser; + + /** Suffix that with correspondent templates formats service and queue names. */ + private final String suffix; + + /** Request queue, is created in {@link #init(ServiceContext)} method. */ + private transient IgniteQueue<I> reqQueue; + + /** Response queue, is created in {@link #init(ServiceContext)} method. */ + private transient IgniteQueue<O> resQueue; + + /** Inference model, is created in {@link #init(ServiceContext)} method. */ + private transient Model<I, O> mdl; + + /** + * Constructs a new instance of Ignite distributed inference model service. + * + * @param reader Inference model reader. + * @param parser Inference model parser. + * @param suffix Suffix that with correspondent templates formats service and queue names. + */ + IgniteDistributedInfModelService(ModelReader reader, ModelParser<I, O, ?> parser, String suffix) { + this.reader = reader; + this.parser = parser; + this.suffix = suffix; + } + + /** {@inheritDoc} */ + @Override public void init(ServiceContext ctx) { + Ignite ignite = Ignition.localIgnite(); + + reqQueue = ignite.queue(String.format(INFERENCE_REQUEST_QUEUE_NAME_PATTERN, suffix), QUEUE_CAPACITY, + queueCfg); + resQueue = ignite.queue(String.format(INFERENCE_RESPONSE_QUEUE_NAME_PATTERN, suffix), QUEUE_CAPACITY, + queueCfg); + + mdl = parser.parse(reader.read()); + } + + /** {@inheritDoc} */ + @Override public void execute(ServiceContext ctx) { + while (!ctx.isCancelled()) { + I req; + try { + req = reqQueue.take(); + } + catch (IllegalStateException e) { + // If the queue is removed during the take() operation exception should be ignored. + if (!reqQueue.removed()) + throw e; + continue; + } + + O res = mdl.predict(req); + + try { + resQueue.put(res); + } + catch (IllegalStateException e) { + // If the queue is removed during the put() operation exception should be ignored. + if (!resQueue.removed()) + throw e; + } + } + } + + /** {@inheritDoc} */ + @Override public void cancel(ServiceContext ctx) { + // Do nothing. Queues are assumed to be closed in model close() method. + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SingleInfModelBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SingleInfModelBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SingleInfModelBuilder.java deleted file mode 100644 index 032ebab..0000000 --- a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SingleInfModelBuilder.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.ml.inference.builder; - -import java.io.Serializable; -import org.apache.ignite.ml.inference.InfModel; -import org.apache.ignite.ml.inference.parser.InfModelParser; -import org.apache.ignite.ml.inference.reader.InfModelReader; - -/** - * Implementation of synchronous inference model builder that builds a model processed locally in a single thread. - */ -public class SingleInfModelBuilder implements SyncInfModelBuilder { - /** {@inheritDoc} */ - @Override public <I extends Serializable, O extends Serializable, M extends InfModel<I, O>> M build(InfModelReader reader, - InfModelParser<I, O, M> parser) { - return parser.parse(reader.read()); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SingleModelBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SingleModelBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SingleModelBuilder.java new file mode 100644 index 0000000..8e9fa19 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SingleModelBuilder.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.inference.builder; + +import java.io.Serializable; +import org.apache.ignite.ml.inference.Model; +import org.apache.ignite.ml.inference.parser.ModelParser; +import org.apache.ignite.ml.inference.reader.ModelReader; + +/** + * Implementation of synchronous inference model builder that builds a model processed locally in a single thread. + */ +public class SingleModelBuilder implements SyncModelBuilder { + /** {@inheritDoc} */ + @Override public <I extends Serializable, O extends Serializable, M extends Model<I, O>> M build(ModelReader reader, + ModelParser<I, O, M> parser) { + return parser.parse(reader.read()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SyncInfModelBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SyncInfModelBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SyncInfModelBuilder.java deleted file mode 100644 index f9883fc..0000000 --- a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SyncInfModelBuilder.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.ml.inference.builder; - -import java.io.Serializable; -import org.apache.ignite.ml.inference.InfModel; -import org.apache.ignite.ml.inference.parser.InfModelParser; -import org.apache.ignite.ml.inference.reader.InfModelReader; - -/** - * Builder of synchronous inference model. Uses specified model reader (see {@link InfModelReader}) and mode parser (see - * {@link InfModelParser}) to build a model. - */ -@FunctionalInterface -public interface SyncInfModelBuilder { - /** - * Builds synchronous inference model using specified model reader and model parser. - * - * @param reader Model reader. - * @param parser Model parser. - * @param <I> Type of model input. - * @param <O> Type of model output. - * @return Inference model. - */ - public <I extends Serializable, O extends Serializable, M extends InfModel<I, O>> M build(InfModelReader reader, - InfModelParser<I, O, M> parser); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SyncModelBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SyncModelBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SyncModelBuilder.java new file mode 100644 index 0000000..ed4f496 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SyncModelBuilder.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.inference.builder; + +import java.io.Serializable; +import org.apache.ignite.ml.inference.Model; +import org.apache.ignite.ml.inference.parser.ModelParser; +import org.apache.ignite.ml.inference.reader.ModelReader; + +/** + * Builder of synchronous inference model. Uses specified model reader (see {@link ModelReader}) and mode parser (see + * {@link ModelParser}) to build a model. + */ +@FunctionalInterface +public interface SyncModelBuilder { + /** + * Builds synchronous inference model using specified model reader and model parser. + * + * @param reader Model reader. + * @param parser Model parser. + * @param <I> Type of model input. + * @param <O> Type of model output. + * @return Inference model. + */ + public <I extends Serializable, O extends Serializable, M extends Model<I, O>> M build(ModelReader reader, + ModelParser<I, O, M> parser); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/ThreadedInfModelBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/ThreadedInfModelBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/ThreadedInfModelBuilder.java deleted file mode 100644 index b39cb8d..0000000 --- a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/ThreadedInfModelBuilder.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.ml.inference.builder; - -import java.io.Serializable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import org.apache.ignite.ml.inference.InfModel; -import org.apache.ignite.ml.inference.parser.InfModelParser; -import org.apache.ignite.ml.inference.reader.InfModelReader; - -/** - * Implementation of asynchronous inference model builder that builds model processed locally utilizing specified number - * of threads. - */ -public class ThreadedInfModelBuilder implements AsyncInfModelBuilder { - /** Number of threads to be utilized for model inference. */ - private final int threads; - - /** - * Constructs a new instance of threaded inference model builder. - * - * @param threads Number of threads to be utilized for model inference. - */ - public ThreadedInfModelBuilder(int threads) { - this.threads = threads; - } - - /** {@inheritDoc} */ - @Override public <I extends Serializable, O extends Serializable> InfModel<I, Future<O>> build( - InfModelReader reader, InfModelParser<I, O, ?> parser) { - return new ThreadedInfModel<>(parser.parse(reader.read()), threads); - } - - /** - * Threaded inference model that performs inference in multiply threads. - * - * @param <I> Type of model input. - * @param <O> Type of model output. - */ - private static class ThreadedInfModel<I extends Serializable, O extends Serializable> - implements InfModel<I, Future<O>> { - /** Inference model. */ - private final InfModel<I, O> mdl; - - /** Thread pool. */ - private final ExecutorService threadPool; - - /** - * Constructs a new instance of threaded inference model. - * - * @param mdl Inference model. - * @param threads Thread pool. - */ - ThreadedInfModel(InfModel<I, O> mdl, int threads) { - this.mdl = mdl; - this.threadPool = Executors.newFixedThreadPool(threads); - } - - /** {@inheritDoc} */ - @Override public Future<O> apply(I input) { - return threadPool.submit(() -> mdl.apply(input)); - } - - /** {@inheritDoc} */ - @Override public void close() { - threadPool.shutdown(); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/ThreadedModelBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/ThreadedModelBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/ThreadedModelBuilder.java new file mode 100644 index 0000000..a38bf8f --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/ThreadedModelBuilder.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.inference.builder; + +import java.io.Serializable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.ignite.ml.inference.Model; +import org.apache.ignite.ml.inference.parser.ModelParser; +import org.apache.ignite.ml.inference.reader.ModelReader; + +/** + * Implementation of asynchronous inference model builder that builds model processed locally utilizing specified number + * of threads. + */ +public class ThreadedModelBuilder implements AsyncModelBuilder { + /** Number of threads to be utilized for model inference. */ + private final int threads; + + /** + * Constructs a new instance of threaded inference model builder. + * + * @param threads Number of threads to be utilized for model inference. + */ + public ThreadedModelBuilder(int threads) { + this.threads = threads; + } + + /** {@inheritDoc} */ + @Override public <I extends Serializable, O extends Serializable> Model<I, Future<O>> build( + ModelReader reader, ModelParser<I, O, ?> parser) { + return new ThreadedInfModel<>(parser.parse(reader.read()), threads); + } + + /** + * Threaded inference model that performs inference in multiply threads. + * + * @param <I> Type of model input. + * @param <O> Type of model output. + */ + private static class ThreadedInfModel<I extends Serializable, O extends Serializable> + implements Model<I, Future<O>> { + /** Inference model. */ + private final Model<I, O> mdl; + + /** Thread pool. */ + private final ExecutorService threadPool; + + /** + * Constructs a new instance of threaded inference model. + * + * @param mdl Inference model. + * @param threads Thread pool. + */ + ThreadedInfModel(Model<I, O> mdl, int threads) { + this.mdl = mdl; + this.threadPool = Executors.newFixedThreadPool(threads); + } + + /** {@inheritDoc} */ + @Override public Future<O> predict(I input) { + return threadPool.submit(() -> mdl.predict(input)); + } + + /** {@inheritDoc} */ + @Override public void close() { + threadPool.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/IgniteFunctionInfModelParser.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/IgniteFunctionInfModelParser.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/IgniteFunctionInfModelParser.java deleted file mode 100644 index 9c8a862..0000000 --- a/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/IgniteFunctionInfModelParser.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.ml.inference.parser; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import org.apache.ignite.ml.inference.InfModel; -import org.apache.ignite.ml.math.functions.IgniteFunction; - -/** - * Implementation of model parser that accepts serialized {@link IgniteFunction}. - * - * @param <I> Type of model input. - * @param <O> Type of model output. - */ -public class IgniteFunctionInfModelParser<I, O> implements InfModelParser<I, O, InfModel<I, O>> { - /** */ - private static final long serialVersionUID = -4624683614990816434L; - - /** {@inheritDoc} */ - @Override public InfModel<I, O> parse(byte[] mdl) { - try (ByteArrayInputStream bais = new ByteArrayInputStream(mdl); - ObjectInputStream ois = new ObjectInputStream(bais)) { - @SuppressWarnings("unchecked") - IgniteFunction<I, O> function = (IgniteFunction<I, O>)ois.readObject(); - - return new IgniteFunctionInfoModel(function); - } - catch (IOException | ClassNotFoundException e) { - throw new RuntimeException(e); - } - } - - /** - * Inference model that wraps {@link IgniteFunction}. - */ - private class IgniteFunctionInfoModel implements InfModel<I, O> { - /** Ignite function. */ - private final IgniteFunction<I, O> function; - - /** - * Constructs a new instance of Ignite function. - * - * @param function Ignite function. - */ - IgniteFunctionInfoModel(IgniteFunction<I, O> function) { - this.function = function; - } - - /** {@inheritDoc} */ - @Override public O apply(I input) { - return function.apply(input); - } - - /** {@inheritDoc} */ - @Override public void close() { - // Do nothing. - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/IgniteModelParser.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/IgniteModelParser.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/IgniteModelParser.java new file mode 100644 index 0000000..372d8c5 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/IgniteModelParser.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.inference.parser; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import org.apache.ignite.ml.IgniteModel; +import org.apache.ignite.ml.math.functions.IgniteFunction; + +/** + * Implementation of model parser that accepts serialized {@link IgniteFunction}. + * + * @param <I> Type of model input. + * @param <O> Type of model output. + */ +public class IgniteModelParser<I, O> implements ModelParser<I, O, IgniteModel<I, O>> { + /** */ + private static final long serialVersionUID = -4624683614990816434L; + + /** {@inheritDoc} */ + @Override public IgniteModel<I, O> parse(byte[] mdl) { + try (ByteArrayInputStream bais = new ByteArrayInputStream(mdl); + ObjectInputStream ois = new ObjectInputStream(bais)) { + @SuppressWarnings("unchecked") + IgniteModel<I, O> res = (IgniteModel<I, O>)ois.readObject(); + + return res; + } + catch (IOException | ClassNotFoundException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2dc0d9f7/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/InfModelParser.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/InfModelParser.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/InfModelParser.java deleted file mode 100644 index df5659c..0000000 --- a/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/InfModelParser.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.ml.inference.parser; - -import java.io.Serializable; -import org.apache.ignite.ml.inference.InfModel; - -/** - * Model parser that accepts a serialized model represented by byte array, parses it and returns {@link InfModel}. - * - * @param <I> Type of model input. - * @param <O> Type of model output. - */ -@FunctionalInterface -public interface InfModelParser<I, O, M extends InfModel<I, O>> extends Serializable { - /** - * Accepts serialized model represented by byte array, parses it and returns {@link InfModel}. - * - * @param mdl Serialized model represented by byte array. - * @return Inference model. - */ - public M parse(byte[] mdl); -}