http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/nn/UpdatesStrategy.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/nn/UpdatesStrategy.java 
b/modules/ml/src/main/java/org/apache/ignite/ml/nn/UpdatesStrategy.java
new file mode 100644
index 0000000..e48d946
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/UpdatesStrategy.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.nn;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.ignite.ml.math.functions.IgniteFunction;
+import 
org.apache.ignite.ml.optimization.updatecalculators.ParameterUpdateCalculator;
+
+/**
+ * Class encapsulating update strategies for group trainers based on updates.
+ *
+ * @param <M> Type of model to be optimized.
+ * @param <U> Type of update.
+ */
+public class UpdatesStrategy<M, U extends Serializable> {
+    /**
+     * {@link ParameterUpdateCalculator}.
+     */
+    private ParameterUpdateCalculator<M, U> updatesCalculator;
+
+    /**
+     * Function used to reduce updates in one training (for example, sum all 
sequential gradient updates to get one
+     * gradient update).
+     */
+    private IgniteFunction<List<U>, U> locStepUpdatesReducer;
+
+    /**
+     * Function used to reduce updates from different trainings (for example, 
averaging of gradients of all parallel trainings).
+     */
+    private IgniteFunction<List<U>, U> allUpdatesReducer;
+
+    /**
+     * Construct instance of this class with given parameters.
+     *
+     * @param updatesCalculator Parameter update calculator.
+     * @param locStepUpdatesReducer Function used to reduce updates in one 
training
+     * (for example, sum all sequential gradient updates to get one gradient 
update).
+     * @param allUpdatesReducer Function used to reduce updates from different 
trainings
+     * (for example, averaging of gradients of all parallel trainings).
+     */
+    public UpdatesStrategy(
+        ParameterUpdateCalculator<M, U> updatesCalculator,
+        IgniteFunction<List<U>, U> locStepUpdatesReducer,
+        IgniteFunction<List<U>, U> allUpdatesReducer) {
+        this.updatesCalculator = updatesCalculator;
+        this.locStepUpdatesReducer = locStepUpdatesReducer;
+        this.allUpdatesReducer = allUpdatesReducer;
+    }
+
+    /**
+     * Get parameter update calculator (see {@link ParameterUpdateCalculator}).
+     *
+     * @return Parameter update calculator.
+     */
+    public ParameterUpdateCalculator<M, U> getUpdatesCalculator() {
+        return updatesCalculator;
+    }
+
+    /**
+     * Get function used to reduce updates in one training
+     * (for example, sum all sequential gradient updates to get one gradient 
update).
+     *
+     * @return Function used to reduce updates in one training
+     * (for example, sum all sequential gradient updates to get on gradient 
update).
+     */
+    public IgniteFunction<List<U>, U> locStepUpdatesReducer() {
+        return locStepUpdatesReducer;
+    }
+
+    /**
+     * Get function used to reduce updates from different trainings
+     * (for example, averaging of gradients of all parallel trainings).
+     *
+     * @return Function used to reduce updates from different trainings.
+     */
+    public IgniteFunction<List<U>, U> allUpdatesReducer() {
+        return allUpdatesReducer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/optimization/GradientDescent.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/optimization/GradientDescent.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/optimization/GradientDescent.java
deleted file mode 100644
index 15ed914..0000000
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/optimization/GradientDescent.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.optimization;
-
-import org.apache.ignite.ml.math.Matrix;
-import org.apache.ignite.ml.math.StorageConstants;
-import org.apache.ignite.ml.math.Vector;
-import org.apache.ignite.ml.math.functions.IgniteFunction;
-import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix;
-import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
-import org.apache.ignite.ml.math.impls.vector.FunctionVector;
-import 
org.apache.ignite.ml.optimization.util.SparseDistributedMatrixMapReducer;
-
-/**
- * Gradient descent optimizer.
- */
-public class GradientDescent {
-    /**
-     * Function which computes gradient of the loss function at any given 
point.
-     */
-    private final GradientFunction lossGradient;
-
-    /**
-     * Weights updater applied on every gradient descent step to decide how 
weights should be changed.
-     */
-    private final Updater updater;
-
-    /**
-     * Max number of gradient descent iterations.
-     */
-    private int maxIterations = 1000;
-
-    /**
-     * Convergence tolerance is condition which decides iteration termination.
-     */
-    private double convergenceTol = 1e-8;
-
-    /**
-     * New gradient descent instance based of loss function and updater.
-     *
-     * @param lossGradient Function which computes gradient of the loss 
function at any given point
-     * @param updater Weights updater applied on every gradient descent step 
to decide how weights should be changed
-     */
-    public GradientDescent(GradientFunction lossGradient, Updater updater) {
-        this.lossGradient = lossGradient;
-        this.updater = updater;
-    }
-
-    /**
-     * Sets max number of gradient descent iterations.
-     *
-     * @param maxIterations Max number of gradient descent iterations
-     * @return This gradient descent instance
-     */
-    public GradientDescent withMaxIterations(int maxIterations) {
-        assert maxIterations >= 0;
-
-        this.maxIterations = maxIterations;
-
-        return this;
-    }
-
-    /**
-     * Sets convergence tolerance.
-     *
-     * @param convergenceTol Condition which decides iteration termination
-     * @return This gradient descent instance
-     */
-    public GradientDescent withConvergenceTol(double convergenceTol) {
-        assert convergenceTol >= 0;
-
-        this.convergenceTol = convergenceTol;
-
-        return this;
-    }
-
-    /**
-     * Computes point where loss function takes minimal value.
-     *
-     * @param data Inputs parameters of loss function
-     * @param initWeights Initial weights
-     * @return Point where loss function takes minimal value
-     */
-    public Vector optimize(Matrix data, Vector initWeights) {
-        Vector weights = initWeights, oldWeights = null, oldGradient = null;
-        IgniteFunction<Vector, Vector> gradientFunction = 
getLossGradientFunction(data);
-
-        for (int iteration = 0; iteration < maxIterations; iteration++) {
-            Vector gradient = gradientFunction.apply(weights);
-            Vector newWeights = updater.compute(oldWeights, oldGradient, 
weights, gradient, iteration);
-
-            if (isConverged(weights, newWeights))
-                return newWeights;
-            else {
-                oldGradient = gradient;
-                oldWeights = weights;
-                weights = newWeights;
-            }
-        }
-        return weights;
-    }
-
-    /**
-     * Calculates gradient based in distributed matrix using {@link 
SparseDistributedMatrixMapReducer}.
-     *
-     * @param data Distributed matrix
-     * @param weights Point to calculate gradient
-     * @return Gradient
-     */
-    private Vector calculateDistributedGradient(SparseDistributedMatrix data, 
Vector weights) {
-        SparseDistributedMatrixMapReducer mapReducer = new 
SparseDistributedMatrixMapReducer(data);
-        return mapReducer.mapReduce(
-            (matrix, args) -> {
-                Matrix inputs = extractInputs(matrix);
-                Vector groundTruth = extractGroundTruth(matrix);
-
-                return lossGradient.compute(inputs, groundTruth, args);
-            },
-            gradients -> {
-                int cnt = 0;
-                Vector resGradient = new 
DenseLocalOnHeapVector(data.columnSize());
-
-                for (Vector gradient : gradients) {
-                    if (gradient != null) {
-                        resGradient = resGradient.plus(gradient);
-                        cnt++;
-                    }
-                }
-
-                return resGradient.divide(cnt);
-            },
-            weights);
-    }
-
-    /**
-     * Tests if gradient descent process converged.
-     *
-     * @param weights Weights
-     * @param newWeights New weights
-     * @return {@code true} if process has converged, otherwise {@code false}
-     */
-    private boolean isConverged(Vector weights, Vector newWeights) {
-        if (convergenceTol == 0)
-            return false;
-        else {
-            double solutionVectorDiff = weights.minus(newWeights).kNorm(2.0);
-            return solutionVectorDiff < convergenceTol * 
Math.max(newWeights.kNorm(2.0), 1.0);
-        }
-    }
-
-    /**
-     * Extracts first column with ground truth from the data set matrix.
-     *
-     * @param data data to build model
-     * @return Ground truth vector
-     */
-    private Vector extractGroundTruth(Matrix data) {
-        return data.getCol(0);
-    }
-
-    /**
-     * Extracts all inputs from data set matrix and updates matrix so that 
first column contains value 1.0.
-     *
-     * @param data data to build model
-     * @return Inputs matrix
-     */
-    private Matrix extractInputs(Matrix data) {
-        data = data.copy();
-        data.assignColumn(0, new FunctionVector(data.rowSize(), row -> 1.0));
-        return data;
-    }
-
-    /** Makes carrying of the gradient function and fixes data matrix. */
-    private IgniteFunction<Vector, Vector> getLossGradientFunction(Matrix 
data) {
-        if (data instanceof SparseDistributedMatrix) {
-            SparseDistributedMatrix distributedMatrix = 
(SparseDistributedMatrix)data;
-
-            if (distributedMatrix.getStorage().storageMode() == 
StorageConstants.ROW_STORAGE_MODE)
-                return weights -> 
calculateDistributedGradient(distributedMatrix, weights);
-        }
-
-        Matrix inputs = extractInputs(data);
-        Vector groundTruth = extractGroundTruth(data);
-
-        return weights -> lossGradient.compute(inputs, groundTruth, weights);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/optimization/GradientFunction.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/optimization/GradientFunction.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/optimization/GradientFunction.java
deleted file mode 100644
index a6a1e71..0000000
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/optimization/GradientFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.optimization;
-
-import java.io.Serializable;
-import org.apache.ignite.ml.math.Matrix;
-import org.apache.ignite.ml.math.Vector;
-
-/**
- * Function which computes gradient of the loss function at any given point.
- */
-@FunctionalInterface
-public interface GradientFunction extends Serializable {
-    /** */
-    Vector compute(Matrix inputs, Vector groundTruth, Vector pnt);
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/optimization/LeastSquaresGradientFunction.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/optimization/LeastSquaresGradientFunction.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/optimization/LeastSquaresGradientFunction.java
deleted file mode 100644
index 4d90e3b..0000000
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/optimization/LeastSquaresGradientFunction.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.optimization;
-
-import org.apache.ignite.ml.math.Matrix;
-import org.apache.ignite.ml.math.Vector;
-
-/**
- * Function which computes gradient of least square loss function.
- */
-public class LeastSquaresGradientFunction implements GradientFunction {
-    /**
-     * {@inheritDoc}
-     */
-    @Override public Vector compute(Matrix inputs, Vector groundTruth, Vector 
pnt) {
-        return inputs.transpose().times(inputs.times(pnt).minus(groundTruth));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/optimization/util/SparseDistributedMatrixMapReducer.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/optimization/util/SparseDistributedMatrixMapReducer.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/optimization/util/SparseDistributedMatrixMapReducer.java
deleted file mode 100644
index 20f861e..0000000
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/optimization/util/SparseDistributedMatrixMapReducer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.optimization.util;
-
-import java.util.Collection;
-import java.util.Map;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.Ignition;
-import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.ml.math.Matrix;
-import org.apache.ignite.ml.math.distributed.keys.RowColMatrixKey;
-import org.apache.ignite.ml.math.functions.IgniteBiFunction;
-import org.apache.ignite.ml.math.functions.IgniteFunction;
-import org.apache.ignite.ml.math.impls.matrix.DenseLocalOnHeapMatrix;
-import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix;
-import 
org.apache.ignite.ml.math.impls.storage.matrix.SparseDistributedMatrixStorage;
-
-/**
- * Wrapper of {@link SparseDistributedMatrix} which allow to perform 
computation on every node containing a part of the
- * distributed matrix, get results and then reduce them.
- */
-public class SparseDistributedMatrixMapReducer {
-    /** */
-    private final SparseDistributedMatrix distributedMatrix;
-
-    /** */
-    public SparseDistributedMatrixMapReducer(
-        SparseDistributedMatrix distributedMatrix) {
-        this.distributedMatrix = distributedMatrix;
-    }
-
-    /** */
-    public <R, T> R mapReduce(IgniteBiFunction<Matrix, T, R> mapper, 
IgniteFunction<Collection<R>, R> reducer, T args) {
-        Ignite ignite = Ignition.localIgnite();
-        SparseDistributedMatrixStorage storage = 
(SparseDistributedMatrixStorage)distributedMatrix.getStorage();
-
-        int colSize = distributedMatrix.columnSize();
-
-        Collection<R> results = ignite
-            .compute(ignite.cluster().forDataNodes(storage.cacheName()))
-            .broadcast(arguments -> {
-                Ignite locIgnite = Ignition.localIgnite();
-
-                Affinity<RowColMatrixKey> affinity = 
locIgnite.affinity(storage.cacheName());
-                ClusterNode locNode = locIgnite.cluster().localNode();
-
-                Map<ClusterNode, Collection<RowColMatrixKey>> keys = 
affinity.mapKeysToNodes(storage.getAllKeys());
-                Collection<RowColMatrixKey> locKeys = keys.get(locNode);
-
-                if (locKeys != null) {
-                    int idx = 0;
-                    Matrix locMatrix = new 
DenseLocalOnHeapMatrix(locKeys.size(), colSize);
-
-                    for (RowColMatrixKey key : locKeys) {
-                        Map<Integer, Double> row = storage.cache().get(key);
-
-                        for (Map.Entry<Integer, Double> cell : row.entrySet())
-                            locMatrix.set(idx, cell.getKey(), cell.getValue());
-
-                        idx++;
-                    }
-                    return mapper.apply(locMatrix, arguments);
-                }
-                return null;
-            }, args);
-        return reducer.apply(results);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/optimization/util/package-info.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/optimization/util/package-info.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/optimization/util/package-info.java
deleted file mode 100644
index cb01ab6..0000000
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/optimization/util/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * <!-- Package description. -->
- * Contains util classes used in optimization package.
- */
-package org.apache.ignite.ml.optimization.util;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/regressions/linear/LinearRegressionLSQRTrainer.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/regressions/linear/LinearRegressionLSQRTrainer.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/regressions/linear/LinearRegressionLSQRTrainer.java
index 9526db1..095aa31 100644
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/regressions/linear/LinearRegressionLSQRTrainer.java
+++ 
b/modules/ml/src/main/java/org/apache/ignite/ml/regressions/linear/LinearRegressionLSQRTrainer.java
@@ -17,18 +17,17 @@
 
 package org.apache.ignite.ml.regressions.linear;
 
+import java.util.Arrays;
 import org.apache.ignite.ml.dataset.DatasetBuilder;
+import 
org.apache.ignite.ml.dataset.primitive.builder.data.SimpleLabeledDatasetDataBuilder;
 import org.apache.ignite.ml.math.Vector;
 import org.apache.ignite.ml.math.functions.IgniteBiFunction;
 import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
-import org.apache.ignite.ml.math.isolve.LinSysPartitionDataBuilderOnHeap;
 import org.apache.ignite.ml.math.isolve.lsqr.AbstractLSQR;
 import org.apache.ignite.ml.math.isolve.lsqr.LSQROnHeap;
 import org.apache.ignite.ml.math.isolve.lsqr.LSQRResult;
 import org.apache.ignite.ml.trainers.SingleLabelDatasetTrainer;
 
-import java.util.Arrays;
-
 /**
  * Trainer of the linear regression model based on LSQR algorithm.
  *
@@ -43,7 +42,10 @@ public class LinearRegressionLSQRTrainer implements 
SingleLabelDatasetTrainer<Li
 
         try (LSQROnHeap<K, V> lsqr = new LSQROnHeap<>(
             datasetBuilder,
-            new LinSysPartitionDataBuilderOnHeap<>(new 
FeatureExtractorWrapper<>(featureExtractor), lbExtractor)
+            new SimpleLabeledDatasetDataBuilder<>(
+                new FeatureExtractorWrapper<>(featureExtractor),
+                lbExtractor.andThen(e -> new double[]{e})
+            )
         )) {
             res = lsqr.solve(0, 1e-12, 1e-12, 1e8, -1, false, null);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/regressions/linear/LinearRegressionQRTrainer.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/regressions/linear/LinearRegressionQRTrainer.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/regressions/linear/LinearRegressionQRTrainer.java
deleted file mode 100644
index 5de3cda..0000000
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/regressions/linear/LinearRegressionQRTrainer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.regressions.linear;
-
-import org.apache.ignite.ml.Trainer;
-import org.apache.ignite.ml.math.Matrix;
-import org.apache.ignite.ml.math.Vector;
-import org.apache.ignite.ml.math.decompositions.QRDSolver;
-import org.apache.ignite.ml.math.decompositions.QRDecomposition;
-import org.apache.ignite.ml.math.impls.vector.FunctionVector;
-
-/**
- * Linear regression trainer based on least squares loss function and QR 
decomposition.
- */
-public class LinearRegressionQRTrainer implements 
Trainer<LinearRegressionModel, Matrix> {
-    /**
-     * {@inheritDoc}
-     */
-    @Override public LinearRegressionModel train(Matrix data) {
-        Vector groundTruth = extractGroundTruth(data);
-        Matrix inputs = extractInputs(data);
-
-        QRDecomposition decomposition = new QRDecomposition(inputs);
-        QRDSolver solver = new QRDSolver(decomposition.getQ(), 
decomposition.getR());
-
-        Vector variables = solver.solve(groundTruth);
-        Vector weights = variables.viewPart(1, variables.size() - 1);
-
-        double intercept = variables.get(0);
-
-        return new LinearRegressionModel(weights, intercept);
-    }
-
-    /**
-     * Extracts first column with ground truth from the data set matrix.
-     *
-     * @param data data to build model
-     * @return Ground truth vector
-     */
-    private Vector extractGroundTruth(Matrix data) {
-        return data.getCol(0);
-    }
-
-    /**
-     * Extracts all inputs from data set matrix and updates matrix so that 
first column contains value 1.0.
-     *
-     * @param data data to build model
-     * @return Inputs matrix
-     */
-    private Matrix extractInputs(Matrix data) {
-        data = data.copy();
-
-        data.assignColumn(0, new FunctionVector(data.rowSize(), row -> 1.0));
-
-        return data;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/regressions/linear/LinearRegressionSGDTrainer.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/regressions/linear/LinearRegressionSGDTrainer.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/regressions/linear/LinearRegressionSGDTrainer.java
index 9be3fdd..98b8885 100644
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/regressions/linear/LinearRegressionSGDTrainer.java
+++ 
b/modules/ml/src/main/java/org/apache/ignite/ml/regressions/linear/LinearRegressionSGDTrainer.java
@@ -30,7 +30,7 @@ import org.apache.ignite.ml.nn.MultilayerPerceptron;
 import org.apache.ignite.ml.nn.architecture.MLPArchitecture;
 import org.apache.ignite.ml.optimization.LossFunctions;
 import org.apache.ignite.ml.trainers.SingleLabelDatasetTrainer;
-import org.apache.ignite.ml.trainers.group.UpdatesStrategy;
+import org.apache.ignite.ml.nn.UpdatesStrategy;
 
 import java.io.Serializable;
 import java.util.Arrays;
@@ -110,6 +110,9 @@ public class LinearRegressionSGDTrainer<P extends 
Serializable> implements Singl
 
         double[] p = mlp.parameters().getStorage().data();
 
-        return new LinearRegressionModel(new 
DenseLocalOnHeapVector(Arrays.copyOf(p, p.length - 1)), p[p.length - 1]);
+        return new LinearRegressionModel(new DenseLocalOnHeapVector(
+            Arrays.copyOf(p, p.length - 1)),
+            p[p.length - 1]
+        );
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/trainers/Trainer.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/Trainer.java 
b/modules/ml/src/main/java/org/apache/ignite/ml/trainers/Trainer.java
deleted file mode 100644
index b4f83d9..0000000
--- a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/Trainer.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.trainers;
-
-import org.apache.ignite.ml.Model;
-
-/** Trainer interface. */
-@Deprecated
-// TODO: IGNITE-7659: Reduce multiple Trainer interfaces to one
-public interface Trainer<M extends Model, T> {
-    /**
-     * Train the model based on provided data.
-     *
-     * @param data Data for training.
-     * @return Trained model.
-     */
-    public M train(T data);
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/BaseLocalProcessorJob.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/BaseLocalProcessorJob.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/BaseLocalProcessorJob.java
deleted file mode 100644
index e20a55a..0000000
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/BaseLocalProcessorJob.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.trainers.group;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Objects;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.Ignition;
-import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.ml.math.functions.IgniteFunction;
-import org.apache.ignite.ml.math.functions.IgniteSupplier;
-
-/**
- * Base job for group training.
- * It's purpose is to apply worker to each element (cache key or cache entry) 
of given cache specified
- * by keySupplier. Worker produces {@link ResultAndUpdates} object which 
contains 'side effects' which are updates
- * needed to apply to caches and computation result.
- * After we get all {@link ResultAndUpdates} we merge all 'update' parts of 
them for each node
- * and apply them on corresponding node, also we reduce all 'result' by some 
given reducer.
- *
- * @param <K> Type of keys of cache used for group trainer.
- * @param <V> Type of values of cache used for group trainer.
- * @param <T> Type of elements to which workers are applier.
- * @param <R> Type of result of worker.
- */
-public abstract class BaseLocalProcessorJob<K, V, T, R extends Serializable> 
implements ComputeJob {
-    /**
-     * UUID of group training.
-     */
-    protected UUID trainingUUID;
-
-    /**
-     * Worker.
-     */
-    protected IgniteFunction<T, ResultAndUpdates<R>> worker;
-
-    /**
-     * Supplier of keys determining elements to which worker should be applied.
-     */
-    protected IgniteSupplier<Stream<GroupTrainerCacheKey<K>>> keySupplier;
-
-    /**
-     * Operator used to reduce results from worker.
-     */
-    protected IgniteFunction<List<R>, R> reducer;
-
-    /**
-     * Name of cache used for training.
-     */
-    protected String cacheName;
-
-    /**
-     * Construct instance of this class with given arguments.
-     *
-     * @param worker Worker.
-     * @param keySupplier Supplier of keys.
-     * @param reducer Reducer.
-     * @param trainingUUID UUID of training.
-     * @param cacheName Name of cache used for training.
-     */
-    public BaseLocalProcessorJob(
-        IgniteFunction<T, ResultAndUpdates<R>> worker,
-        IgniteSupplier<Stream<GroupTrainerCacheKey<K>>> keySupplier,
-        IgniteFunction<List<R>, R> reducer,
-        UUID trainingUUID, String cacheName) {
-        this.worker = worker;
-        this.keySupplier = keySupplier;
-        this.reducer = reducer;
-        this.trainingUUID = trainingUUID;
-        this.cacheName = cacheName;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void cancel() {
-        // NO-OP.
-    }
-
-    /** {@inheritDoc} */
-    @Override public R execute() throws IgniteException {
-        List<ResultAndUpdates<R>> resultsAndUpdates = toProcess().
-            map(worker).
-            collect(Collectors.toList());
-
-        ResultAndUpdates<R> totalRes = ResultAndUpdates.sum(reducer, 
resultsAndUpdates.stream().filter(Objects::nonNull).collect(Collectors.toList()));
-
-        totalRes.applyUpdates(ignite());
-
-        return totalRes.result();
-    }
-
-    /**
-     * Get stream of elements to process.
-     *
-     * @return Stream of elements to process.
-     */
-    protected abstract Stream<T> toProcess();
-
-    /**
-     * Ignite instance.
-     *
-     * @return Ignite instance.
-     */
-    protected static Ignite ignite() {
-        return Ignition.localIgnite();
-    }
-
-    /**
-     * Get cache used for training.
-     *
-     * @return Cache used for training.
-     */
-    protected IgniteCache<GroupTrainerCacheKey<K>, V> cache() {
-        return ignite().getOrCreateCache(cacheName);
-    }
-
-    /**
-     * Get affinity function for cache used in group training.
-     *
-     * @return Affinity function for cache used in group training.
-     */
-    protected Affinity<GroupTrainerCacheKey> affinity() {
-        return ignite().affinity(cacheName);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/ConstModel.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/ConstModel.java 
b/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/ConstModel.java
deleted file mode 100644
index 75f8179..0000000
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/ConstModel.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.trainers.group;
-
-import org.apache.ignite.ml.Model;
-
-/**
- * Model which outputs given constant.
- *
- * @param <T> Type of constant.
- */
-public class ConstModel<T> implements Model<T, T> {
-    /**
-     * Constant to be returned by this model.
-     */
-    private T c;
-
-    /**
-     * Create instance of this class specified by input parameters.
-     *
-     * @param c Constant to be returned by this model.
-     */
-    public ConstModel(T c) {
-        this.c = c;
-    }
-
-    /** {@inheritDoc} */
-    @Override public T apply(T val) {
-        return c;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainer.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainer.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainer.java
deleted file mode 100644
index fb34bf7..0000000
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainer.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.trainers.group;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.UUID;
-import java.util.stream.Stream;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.ml.Model;
-import org.apache.ignite.ml.math.functions.IgniteFunction;
-import org.apache.ignite.ml.math.functions.IgniteSupplier;
-import org.apache.ignite.ml.trainers.Trainer;
-import org.apache.ignite.ml.trainers.group.chain.ComputationsChain;
-import org.apache.ignite.ml.trainers.group.chain.EntryAndContext;
-import org.apache.ignite.ml.trainers.group.chain.HasTrainingUUID;
-
-/**
- * Class encapsulating synchronous distributed group training.
- * Training is performed by following scheme:
- * 1. For specified set of keys distributed initialization is done. For each 
key some initialization result is returned.
- * 2. All initialization results are processed locally and reduced into some 
object of type I.
- * 3. While 'shouldContinue' condition is true, training loop step is executed.
- * 4. After loop is finished, data from each key from final key set is 
collected.
- * 5. Data collected on previous step is transformed into a model which is 
returned as final result.
- * Note that all methods returning functions, suppliers etc should return 
values with minimal dependencies because they are serialized
- * with all dependent objects.
- *
- * @param <LC> Type of local context of the training.
- * @param <K> Type of data in {@link GroupTrainerCacheKey} keys on which the 
training is done.
- * @param <V> Type of cache values on which the training is done.
- * @param <IN> Type of data returned after initializing of distributed context.
- * @param <R> Type of result returned after training from each node.
- * @param <I> Type of data which is fed into each training loop step and 
returned from it.
- * @param <M> Type of model returned after training.
- * @param <T> Type of input to this trainer.
- * @param <G> Type of distributed context which is needed for forming final 
result which is send from each node to trainer for final model creation.
- */
-abstract class GroupTrainer<LC extends HasTrainingUUID, K, V, IN extends 
Serializable, R extends Serializable, I extends Serializable, M extends Model, 
T extends GroupTrainerInput<K>, G> implements Trainer<M, T> {
-    /**
-     * Cache on which training is performed. For example it can be cache of 
neural networks.
-     */
-    protected IgniteCache<GroupTrainerCacheKey<K>, V> cache;
-
-    /**
-     * Ignite instance.
-     */
-    protected Ignite ignite;
-
-    /**
-     * Construct an instance of this class.
-     *
-     * @param cache Cache on which training is performed.
-     * @param ignite Ignite instance.
-     */
-    GroupTrainer(
-        IgniteCache<GroupTrainerCacheKey<K>, V> cache,
-        Ignite ignite) {
-        this.cache = cache;
-        this.ignite = ignite;
-    }
-
-    /** {@inheritDoc} */
-    @Override public final M train(T data) {
-        UUID trainingUUID = UUID.randomUUID();
-        LC locCtx = initialLocalContext(data, trainingUUID);
-
-        GroupTrainingContext<K, V, LC> ctx = new 
GroupTrainingContext<>(locCtx, cache, ignite);
-        ComputationsChain<LC, K, V, T, T> chain = (i, c) -> i;
-        IgniteFunction<GroupTrainerCacheKey<K>, ResultAndUpdates<IN>> 
distributedInitializer
-            = distributedInitializer(data);
-
-        init(data, trainingUUID);
-
-        M res = chain.
-            thenDistributedForKeys(distributedInitializer, (t, lc) -> 
data.initialKeys(trainingUUID),
-                reduceDistributedInitData()).
-            thenLocally(this::locallyProcessInitData).
-            thenWhile(this::shouldContinue, trainingLoopStep()).
-            
thenDistributedForEntries(this::extractContextForFinalResultCreation, 
finalResultsExtractor(),
-                this::finalResultKeys, finalResultsReducer()).
-            thenLocally(this::mapFinalResult).
-            process(data, ctx);
-
-        cleanup(locCtx);
-
-        return res;
-    }
-
-    /**
-     * Create initial local context from data given as input to trainer.
-     *
-     * @param data Data given as input to this trainer.
-     * @param trainingUUID UUID of this training.
-     * @return Initial local context.
-     */
-    protected abstract LC initialLocalContext(T data, UUID trainingUUID);
-
-    /** Override in subclasses if needed. */
-    protected void init(T data, UUID trainingUUID) {
-    }
-
-    /**
-     * Get function for initialization for each of keys specified in initial 
key set.
-     *
-     * @param data Data given to this trainer as input.
-     * @return Function for initialization for each of keys specified in 
initial key set.
-     */
-    protected abstract IgniteFunction<GroupTrainerCacheKey<K>, 
ResultAndUpdates<IN>> distributedInitializer(T data);
-
-    /**
-     * Get reducer to reduce data collected from initialization of each key 
specified in initial key set.
-     *
-     * @return Reducer to reduce data collected from initialization of each 
key specified in initial key set.
-     */
-    protected abstract IgniteFunction<List<IN>, IN> 
reduceDistributedInitData();
-
-    /**
-     * Transform data from initialization step into data which is fed as input 
to first step of training loop.
-     *
-     * @param data Data from initialization step.
-     * @param locCtx Local context.
-     * @return Data which is fed as input to first step of training loop.
-     */
-    protected abstract I locallyProcessInitData(IN data, LC locCtx);
-
-    /**
-     * Training loop step.
-     *
-     * @return Result of training loop step.
-     */
-    protected abstract ComputationsChain<LC, K, V, I, I> trainingLoopStep();
-
-    /**
-     * Condition specifying if training loop should continue.
-     *
-     * @param data First time, data returned by locallyProcessInitData then 
data returned by last step of loop.
-     * @param locCtx Local context.
-     * @return Boolean value indicating if training loop should continue.
-     */
-    protected abstract boolean shouldContinue(I data, LC locCtx);
-
-    /**
-     * Extract context for final result creation. Each key from the final keys 
set will be processed with
-     * finalResultsExtractor. While entry data (i.e. key and value) for each 
key varies, some data can be common for all
-     * processed entries. This data is called context.
-     *
-     * @param data Data returned from last training loop step.
-     * @param locCtx Local context.
-     * @return Context.
-     */
-    protected abstract IgniteSupplier<G> 
extractContextForFinalResultCreation(I data, LC locCtx);
-
-    /**
-     * Keys for final result creation.
-     *
-     * @param data Data returned from the last training loop step.
-     * @param locCtx Local context.
-     * @return Stream of keys for final result creation.
-     */
-    protected abstract IgniteSupplier<Stream<GroupTrainerCacheKey<K>>> 
finalResultKeys(I data, LC locCtx);
-
-    /**
-     * Get function for extracting final result from each key specified in 
finalResultKeys.
-     *
-     * @return Function for extracting final result from each key specified in 
finalResultKeys.
-     */
-    protected abstract IgniteFunction<EntryAndContext<K, V, G>, 
ResultAndUpdates<R>> finalResultsExtractor();
-
-    /**
-     * Get function for reducing final results.
-     *
-     * @return Function for reducing final results.
-     */
-    protected abstract IgniteFunction<List<R>, R> finalResultsReducer();
-
-    /**
-     * Map final result to model which is returned by trainer.
-     *
-     * @param res Final result.
-     * @param locCtx Local context.
-     * @return Model resulted from training.
-     */
-    protected abstract M mapFinalResult(R res, LC locCtx);
-
-    /**
-     * Performs cleanups of temporary objects created by this trainer.
-     *
-     * @param locCtx Local context.
-     */
-    protected abstract void cleanup(LC locCtx);
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerBaseProcessorTask.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerBaseProcessorTask.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerBaseProcessorTask.java
deleted file mode 100644
index b192f42..0000000
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerBaseProcessorTask.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.trainers.group;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.compute.ComputeTaskAdapter;
-import org.apache.ignite.ml.math.functions.IgniteFunction;
-import org.apache.ignite.ml.math.functions.IgniteSupplier;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Base task for group trainer.
- *
- * @param <K> Type of cache keys of cache used for training.
- * @param <V> Type of cache values of cache used for training.
- * @param <C> Type of context (common part of data needed for computation).
- * @param <T> Type of arguments of workers.
- * @param <R> Type of computation result.
- */
-public abstract class GroupTrainerBaseProcessorTask<K, V, C, T, R extends 
Serializable> extends ComputeTaskAdapter<Void, R> {
-    /**
-     * Context supplier.
-     */
-    protected final IgniteSupplier<C> ctxSupplier;
-
-    /**
-     * UUID of training.
-     */
-    protected final UUID trainingUUID;
-
-    /**
-     * Worker.
-     */
-    protected IgniteFunction<T, ResultAndUpdates<R>> worker;
-
-    /**
-     * Reducer used for reducing of computations on specified keys.
-     */
-    protected final IgniteFunction<List<R>, R> reducer;
-
-    /**
-     * Name of cache on which training is done.
-     */
-    protected final String cacheName;
-
-    /**
-     * Supplier of keys on which worker should be executed.
-     */
-    protected final IgniteSupplier<Stream<GroupTrainerCacheKey<K>>> 
keysSupplier;
-
-    /**
-     * Ignite instance.
-     */
-    protected final Ignite ignite;
-
-    /**
-     * Construct an instance of this class with specified parameters.
-     *
-     * @param trainingUUID UUID of training.
-     * @param ctxSupplier Supplier of context.
-     * @param worker Function calculated on each of specified keys.
-     * @param keysSupplier Supplier of keys on which training is done.
-     * @param reducer Reducer used for reducing results of computation 
performed on each of specified keys.
-     * @param cacheName Name of cache on which training is done.
-     * @param ignite Ignite instance.
-     */
-    public GroupTrainerBaseProcessorTask(UUID trainingUUID,
-        IgniteSupplier<C> ctxSupplier,
-        IgniteFunction<T, ResultAndUpdates<R>> worker,
-        IgniteSupplier<Stream<GroupTrainerCacheKey<K>>> keysSupplier,
-        IgniteFunction<List<R>, R> reducer,
-        String cacheName,
-        Ignite ignite) {
-        this.trainingUUID = trainingUUID;
-        this.ctxSupplier = ctxSupplier;
-        this.worker = worker;
-        this.keysSupplier = keysSupplier;
-        this.reducer = reducer;
-        this.cacheName = cacheName;
-        this.ignite = ignite;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid,
-        @Nullable Void arg) throws IgniteException {
-        Map<ComputeJob, ClusterNode> res = new HashMap<>();
-
-        for (ClusterNode node : subgrid) {
-            BaseLocalProcessorJob<K, V, T, R> job = createJob();
-            res.put(job, node);
-        }
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public R reduce(List<ComputeJobResult> results) throws 
IgniteException {
-        return reducer.apply(results.stream().map(res -> 
(R)res.getData()).filter(Objects::nonNull).collect(Collectors.toList()));
-    }
-
-    /**
-     * Create job for execution on subgrid.
-     *
-     * @return Job for execution on subgrid.
-     */
-    protected abstract BaseLocalProcessorJob<K, V, T, R> createJob();
-
-    /**
-     * Get affinity function of cache on which training is done.
-     *
-     * @return Affinity function of cache on which training is done.
-     */
-    protected Affinity<GroupTrainerCacheKey> affinity() {
-        return ignite.affinity(cacheName);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerCacheKey.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerCacheKey.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerCacheKey.java
deleted file mode 100644
index 5e4cb76..0000000
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerCacheKey.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.trainers.group;
-
-import java.util.UUID;
-import org.apache.ignite.cache.affinity.AffinityKeyMapped;
-
-/**
- * Class used as a key for caches on which {@link GroupTrainer} works.
- * Structurally it is a triple: (nodeLocalEntityIndex, trainingUUID, data);
- * nodeLocalEntityIndex is used to map key to node;
- * trainingUUID is id of training;
- * data is some custom data stored in this key, for example if we want to 
store three neural networks on one node
- * for training with training UUID == trainingUUID, we can use keys
- * (1, trainingUUID, networkIdx1), (1, trainingUUID, networkIdx2), (1, 
trainingUUID, networkIdx3).
- *
- * @param <K> Type of data part of this key.
- */
-public class GroupTrainerCacheKey<K> {
-    /**
-     * Part of key for key-to-node affinity.
-     */
-    @AffinityKeyMapped
-    private Long nodeLocEntityIdx;
-
-    /**
-     * UUID of training.
-     */
-    private UUID trainingUUID;
-
-    /**
-     * Data.
-     */
-    K data;
-
-    /**
-     * Construct instance of this class.
-     *
-     * @param nodeLocEntityIdx Part of key for key-to-node affinity.
-     * @param data Data.
-     * @param trainingUUID Training UUID.
-     */
-    public GroupTrainerCacheKey(long nodeLocEntityIdx, K data, UUID 
trainingUUID) {
-        this.nodeLocEntityIdx = nodeLocEntityIdx;
-        this.trainingUUID = trainingUUID;
-        this.data = data;
-    }
-
-    /**
-     * Construct instance of this class.
-     *
-     * @param nodeLocEntityIdx Part of key for key-to-node affinity.
-     * @param data Data.
-     * @param trainingUUID Training UUID.
-     */
-    public GroupTrainerCacheKey(int nodeLocEntityIdx, K data, UUID 
trainingUUID) {
-        this((long)nodeLocEntityIdx, data, trainingUUID);
-    }
-
-    /**
-     * Get part of key used for key-to-node affinity.
-     *
-     * @return Part of key used for key-to-node affinity.
-     */
-    public Long nodeLocalEntityIndex() {
-        return nodeLocEntityIdx;
-    }
-
-    /**
-     * Get UUID of training.
-     *
-     * @return UUID of training.
-     */
-    public UUID trainingUUID() {
-        return trainingUUID;
-    }
-
-    /**
-     * Get data.
-     *
-     * @return Data.
-     */
-    public K data() {
-        return data;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        GroupTrainerCacheKey<?> key = (GroupTrainerCacheKey<?>)o;
-
-        if (nodeLocEntityIdx != null ? 
!nodeLocEntityIdx.equals(key.nodeLocEntityIdx) : key.nodeLocEntityIdx != null)
-            return false;
-        if (trainingUUID != null ? !trainingUUID.equals(key.trainingUUID) : 
key.trainingUUID != null)
-            return false;
-        return data != null ? data.equals(key.data) : key.data == null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        int res = nodeLocEntityIdx != null ? nodeLocEntityIdx.hashCode() : 0;
-        res = 31 * res + (trainingUUID != null ? trainingUUID.hashCode() : 0);
-        res = 31 * res + (data != null ? data.hashCode() : 0);
-        return res;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerEntriesProcessorTask.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerEntriesProcessorTask.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerEntriesProcessorTask.java
deleted file mode 100644
index daa396f..0000000
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerEntriesProcessorTask.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.trainers.group;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.UUID;
-import java.util.stream.Stream;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.ml.math.functions.IgniteFunction;
-import org.apache.ignite.ml.math.functions.IgniteSupplier;
-import org.apache.ignite.ml.trainers.group.chain.EntryAndContext;
-
-/**
- * Task for processing entries of cache used for training.
- *
- * @param <K> Type of cache keys of cache used for training.
- * @param <V> Type of cache values of cache used for training.
- * @param <C> Type of context (common part of data needed for computation).
- * @param <R> Type of computation result.
- */
-public class GroupTrainerEntriesProcessorTask<K, V, C, R extends Serializable>
-    extends GroupTrainerBaseProcessorTask<K, V, C, EntryAndContext<K, V, C>, 
R> {
-    /**
-     * Construct instance of this class with given parameters.
-     *
-     * @param trainingUUID UUID of training.
-     * @param ctxSupplier Supplier of context.
-     * @param worker Function calculated on each of specified keys.
-     * @param keysSupplier Supplier of keys on which training is done.
-     * @param reducer Reducer used for reducing results of computation 
performed on each of specified keys.
-     * @param cacheName Name of cache on which training is done.
-     * @param ignite Ignite instance.
-     */
-    public GroupTrainerEntriesProcessorTask(UUID trainingUUID,
-        IgniteSupplier<C> ctxSupplier,
-        IgniteFunction<EntryAndContext<K, V, C>, ResultAndUpdates<R>> worker,
-        IgniteSupplier<Stream<GroupTrainerCacheKey<K>>> keysSupplier,
-        IgniteFunction<List<R>, R> reducer,
-        String cacheName,
-        Ignite ignite) {
-        super(trainingUUID, ctxSupplier, worker, keysSupplier, reducer, 
cacheName, ignite);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected BaseLocalProcessorJob<K, V, EntryAndContext<K, V, C>, 
R> createJob() {
-        return new LocalEntriesProcessorJob<>(ctxSupplier, worker, 
keysSupplier, reducer, trainingUUID, cacheName);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerInput.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerInput.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerInput.java
deleted file mode 100644
index ae75f16..0000000
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerInput.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.trainers.group;
-
-import java.util.UUID;
-import java.util.stream.Stream;
-import org.apache.ignite.ml.math.functions.IgniteSupplier;
-
-/**
- * Interface for {@link GroupTrainer} inputs.
- *
- * @param <K> Types of cache keys used for group training.
- */
-public interface GroupTrainerInput<K> {
-    /**
-     * Get supplier of stream of keys used for initialization of {@link 
GroupTrainer}.
-     *
-     * @param trainingUUID UUID of training.
-     * @return Supplier of stream of keys used for initialization of {@link 
GroupTrainer}.
-     */
-    IgniteSupplier<Stream<GroupTrainerCacheKey<K>>> initialKeys(UUID 
trainingUUID);
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerKeysProcessorTask.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerKeysProcessorTask.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerKeysProcessorTask.java
deleted file mode 100644
index 7ac18f8..0000000
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainerKeysProcessorTask.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.trainers.group;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.UUID;
-import java.util.stream.Stream;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.ml.math.functions.IgniteFunction;
-import org.apache.ignite.ml.math.functions.IgniteSupplier;
-import org.apache.ignite.ml.trainers.group.chain.KeyAndContext;
-
-/**
- * Task for processing entries of cache used for training.
- *
- * @param <K> Type of cache keys of cache used for training.
- * @param <C> Type of context (common part of data needed for computation).
- * @param <R> Type of computation result.
- */
-public class GroupTrainerKeysProcessorTask<K, C, R extends Serializable> 
extends GroupTrainerBaseProcessorTask<K, Object, C, KeyAndContext<K, C>, R> {
-    /**
-     * Construct instance of this class with specified parameters.
-     *
-     * @param trainingUUID UUID of training.
-     * @param ctxSupplier Context supplier.
-     * @param worker Function calculated on each of specified keys.
-     * @param keysSupplier Supplier of keys on which computations should be 
done.
-     * @param reducer Reducer used for reducing results of computation 
performed on each of specified keys.
-     * @param cacheName Name of cache on which training is done.
-     * @param ignite Ignite instance.
-     */
-    public GroupTrainerKeysProcessorTask(UUID trainingUUID,
-        IgniteSupplier<C> ctxSupplier,
-        IgniteFunction<KeyAndContext<K, C>, ResultAndUpdates<R>> worker,
-        IgniteSupplier<Stream<GroupTrainerCacheKey<K>>> keysSupplier,
-        IgniteFunction<List<R>, R> reducer,
-        String cacheName,
-        Ignite ignite) {
-        super(trainingUUID, ctxSupplier, worker, keysSupplier, reducer, 
cacheName, ignite);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected BaseLocalProcessorJob<K, Object, KeyAndContext<K, C>, 
R> createJob() {
-        return new LocalKeysProcessorJob<>(ctxSupplier, worker, keysSupplier, 
reducer, trainingUUID, cacheName);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainingContext.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainingContext.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainingContext.java
deleted file mode 100644
index cbd04b2..0000000
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/GroupTrainingContext.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.trainers.group;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.ml.trainers.group.chain.HasTrainingUUID;
-
-/**
- * Context for group training.
- *
- * @param <K> Type of keys of cache used for group training.
- * @param <V> Type of values of cache used for group training.
- * @param <L> Type of local context used for training.
- */
-public class GroupTrainingContext<K, V, L extends HasTrainingUUID> {
-    /**
-     * Local context.
-     */
-    private L locCtx;
-
-    /**
-     * Cache used for training.
-     */
-    private IgniteCache<GroupTrainerCacheKey<K>, V> cache;
-
-    /**
-     * Ignite instance.
-     */
-    private Ignite ignite;
-
-    /**
-     * Construct instance of this class.
-     *
-     * @param locCtx Local context.
-     * @param cache Information about cache used for training.
-     * @param ignite Ignite instance.
-     */
-    public GroupTrainingContext(L locCtx, IgniteCache<GroupTrainerCacheKey<K>, 
V> cache, Ignite ignite) {
-        this.locCtx = locCtx;
-        this.cache = cache;
-        this.ignite = ignite;
-    }
-
-    /**
-     * Construct new training context with same parameters but with new cache.
-     *
-     * @param newCache New cache.
-     * @param <K1> Type of keys of new cache.
-     * @param <V1> Type of values of new cache.
-     * @return New training context with same parameters but with new cache.
-     */
-    public <K1, V1> GroupTrainingContext<K1, V1, L> 
withCache(IgniteCache<GroupTrainerCacheKey<K1>, V1> newCache) {
-        return new GroupTrainingContext<>(locCtx, newCache, ignite);
-    }
-
-    /**
-     * Get local context.
-     *
-     * @return Local context.
-     */
-    public L localContext() {
-        return locCtx;
-    }
-
-    /**
-     * Get cache used for training.
-     *
-     * @return Cache used for training.
-     */
-    public IgniteCache<GroupTrainerCacheKey<K>, V> cache() {
-        return cache;
-    }
-
-    /**
-     * Get ignite instance.
-     *
-     * @return Ignite instance.
-     */
-    public Ignite ignite() {
-        return ignite;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/LocalEntriesProcessorJob.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/LocalEntriesProcessorJob.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/LocalEntriesProcessorJob.java
deleted file mode 100644
index d035aa5..0000000
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/LocalEntriesProcessorJob.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.trainers.group;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.ignite.ml.math.functions.IgniteFunction;
-import org.apache.ignite.ml.math.functions.IgniteSupplier;
-import org.apache.ignite.ml.trainers.group.chain.EntryAndContext;
-
-/**
- * {@link BaseLocalProcessorJob} specified to entry processing.
- *
- * @param <K> Type of cache used for group training.
- * @param <V> Type of values used for group training.
- * @param <C> Type of context.
- * @param <R> Type of result returned by worker.
- */
-public class LocalEntriesProcessorJob<K, V, C, R extends Serializable> extends 
BaseLocalProcessorJob<K, V, EntryAndContext<K, V, C>, R> {
-    /**
-     * Supplier of context for worker.
-     */
-    private final IgniteSupplier<C> ctxSupplier;
-
-    /**
-     * Construct an instance of this class.
-     *
-     * @param ctxSupplier Supplier for context for worker.
-     * @param worker Worker.
-     * @param keySupplier Supplier of keys.
-     * @param reducer Reducer.
-     * @param trainingUUID UUID for training.
-     * @param cacheName Name of cache used for training.
-     */
-    public LocalEntriesProcessorJob(IgniteSupplier<C> ctxSupplier,
-        IgniteFunction<EntryAndContext<K, V, C>, ResultAndUpdates<R>> worker,
-        IgniteSupplier<Stream<GroupTrainerCacheKey<K>>> keySupplier,
-        IgniteFunction<List<R>, R> reducer,
-        UUID trainingUUID, String cacheName) {
-        super(worker, keySupplier, reducer, trainingUUID, cacheName);
-        this.ctxSupplier = ctxSupplier;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Stream<EntryAndContext<K, V, C>> toProcess() {
-        C ctx = ctxSupplier.get();
-
-        return selectLocalEntries().map(e -> new EntryAndContext<>(e, ctx));
-    }
-
-    /**
-     * Select entries for processing by worker.
-     *
-     * @return Entries for processing by worker.
-     */
-    private Stream<Map.Entry<GroupTrainerCacheKey<K>, V>> selectLocalEntries() 
{
-        Set<GroupTrainerCacheKey<K>> keys = keySupplier.get().
-            filter(k -> 
Objects.requireNonNull(affinity().mapKeyToNode(k)).isLocal()).
-            filter(k -> k.trainingUUID().equals(trainingUUID)).
-            collect(Collectors.toSet());
-
-        return cache().getAll(keys).entrySet().stream();
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/LocalKeysProcessorJob.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/LocalKeysProcessorJob.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/LocalKeysProcessorJob.java
deleted file mode 100644
index cad53c9..0000000
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/LocalKeysProcessorJob.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.trainers.group;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Objects;
-import java.util.UUID;
-import java.util.stream.Stream;
-import org.apache.ignite.ml.math.functions.IgniteFunction;
-import org.apache.ignite.ml.math.functions.IgniteSupplier;
-import org.apache.ignite.ml.trainers.group.chain.KeyAndContext;
-
-/**
- * {@link BaseLocalProcessorJob} specified to keys processing.
- *
- * @param <K> Type of cache used for group training.
- * @param <V> Type of values used for group training.
- * @param <C> Type of context.
- * @param <R> Type of result returned by worker.
- */
-public class LocalKeysProcessorJob<K, V, C, R extends Serializable> extends 
BaseLocalProcessorJob<K, V, KeyAndContext<K, C>, R> {
-    /**
-     * Supplier of worker context.
-     */
-    private final IgniteSupplier<C> ctxSupplier;
-
-    /**
-     * Construct instance of this class with given arguments.
-     *
-     * @param worker Worker.
-     * @param keySupplier Supplier of keys.
-     * @param reducer Reducer.
-     * @param trainingUUID UUID of training.
-     * @param cacheName Name of cache used for training.
-     */
-    public LocalKeysProcessorJob(IgniteSupplier<C> ctxSupplier,
-        IgniteFunction<KeyAndContext<K, C>, ResultAndUpdates<R>> worker,
-        IgniteSupplier<Stream<GroupTrainerCacheKey<K>>> keySupplier,
-        IgniteFunction<List<R>, R> reducer,
-        UUID trainingUUID, String cacheName) {
-        super(worker, keySupplier, reducer, trainingUUID, cacheName);
-        this.ctxSupplier = ctxSupplier;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Stream<KeyAndContext<K, C>> toProcess() {
-        C ctx = ctxSupplier.get();
-
-        return selectLocalKeys().map(k -> new KeyAndContext<>(k, ctx));
-    }
-
-    /**
-     * Get subset of keys provided by keySupplier which are mapped to node on 
which code is executed.
-     *
-     * @return Subset of keys provided by keySupplier which are mapped to node 
on which code is executed.
-     */
-    private Stream<GroupTrainerCacheKey<K>> selectLocalKeys() {
-        return keySupplier.get().
-            filter(k -> 
Objects.requireNonNull(affinity().mapKeyToNode(k)).isLocal()).
-            filter(k -> k.trainingUUID().equals(trainingUUID));
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/Metaoptimizer.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/Metaoptimizer.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/Metaoptimizer.java
deleted file mode 100644
index 0ab6d32..0000000
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/Metaoptimizer.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.trainers.group;
-
-import java.util.List;
-import org.apache.ignite.ml.math.functions.IgniteFunction;
-
-/**
- * Class encapsulating data transformations in group training in {@link 
MetaoptimizerGroupTrainer}, which is adapter of
- * {@link GroupTrainer}.
- *
- * @param <LC> Local context of {@link GroupTrainer}.
- * @param <X> Type of data which is processed in training loop step.
- * @param <Y> Type of data returned by training loop step data processor.
- * @param <I> Type of data to which data returned by distributed 
initialization is mapped.
- * @param <D> Type of data returned by initialization.
- * @param <O> Type of data to which data returned by data processor is mapped.
- */
-public interface Metaoptimizer<LC, X, Y, I, D, O> {
-    /**
-     * Get function used to reduce distributed initialization results.
-     *
-     * @return Function used to reduce distributed initialization results.
-     */
-    IgniteFunction<List<D>, D> initialReducer();
-
-    /**
-     * Maps data returned by distributed initialization to data consumed by 
training loop step.
-     *
-     * @param data Data returned by distributed initialization.
-     * @param locCtx Local context.
-     * @return Mapping of data returned by distributed initialization to data 
consumed by training loop step.
-     */
-    I locallyProcessInitData(D data, LC locCtx);
-
-    /**
-     * Preprocess data for {@link MetaoptimizerGroupTrainer#dataProcessor()}.
-     *
-     * @return Preprocessed data for {@link 
MetaoptimizerGroupTrainer#dataProcessor()}.
-     */
-    default IgniteFunction<X, X> distributedPreprocessor() {
-        return x -> x;
-    }
-
-    /**
-     * Get function used to map values returned by {@link 
MetaoptimizerGroupTrainer#dataProcessor()}.
-     *
-     * @return Function used to map values returned by {@link 
MetaoptimizerGroupTrainer#dataProcessor()}.
-     */
-    IgniteFunction<Y, O> distributedPostprocessor();
-
-    /**
-     * Get binary operator used for reducing results returned by 
distributedPostprocessor.
-     *
-     * @return Binary operator used for reducing results returned by 
distributedPostprocessor.
-     */
-    IgniteFunction<List<O>, O> postProcessReducer();
-
-    /**
-     * Transform data returned by distributed part of training loop step into 
input fed into distributed part of training
-     * loop step.
-     *
-     * @param input Type of output of distributed part of training loop step.
-     * @param locCtx Local context.
-     * @return Result of transform data returned by distributed part of 
training loop step into input fed into distributed part of training
-     * loop step.
-     */
-    I localProcessor(O input, LC locCtx);
-
-    /**
-     * Returns value of predicate 'should training loop continue given 
previous step output and local context'.
-     *
-     * @param input Input of previous step.
-     * @param locCtx Local context.
-     * @return Value of predicate 'should training loop continue given 
previous step output and local context'.
-     */
-    boolean shouldContinue(I input, LC locCtx);
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/47cfdc27/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/MetaoptimizerDistributedStep.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/MetaoptimizerDistributedStep.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/MetaoptimizerDistributedStep.java
deleted file mode 100644
index 08e1f47..0000000
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/trainers/group/MetaoptimizerDistributedStep.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.trainers.group;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.stream.Stream;
-import org.apache.ignite.ml.math.functions.IgniteFunction;
-import org.apache.ignite.ml.math.functions.IgniteSupplier;
-import 
org.apache.ignite.ml.trainers.group.chain.DistributedEntryProcessingStep;
-import org.apache.ignite.ml.trainers.group.chain.EntryAndContext;
-import org.apache.ignite.ml.trainers.group.chain.HasTrainingUUID;
-
-/**
- * Distributed step based on {@link Metaoptimizer}.
- *
- * @param <L> Type of local context.
- * @param <K> Type of data in {@link GroupTrainerCacheKey}.
- * @param <V> Type of values of cache on which training is done.
- * @param <G> Type of distributed context.
- * @param <I> Type of data to which data returned by distributed 
initialization is mapped (see {@link Metaoptimizer}).
- * @param <O> Type of data to which data returned by data processor is mapped 
(see {@link Metaoptimizer}).
- * @param <X> Type of data which is processed in training loop step (see 
{@link Metaoptimizer}).
- * @param <Y> Type of data returned by training loop step data processor (see 
{@link Metaoptimizer}).
- * @param <D> Type of data returned by initialization (see {@link 
Metaoptimizer}).
- */
-class MetaoptimizerDistributedStep<L extends HasTrainingUUID, K, V, G, I 
extends Serializable, O extends Serializable,
-    X, Y, D extends Serializable> implements DistributedEntryProcessingStep<L, 
K, V, G, I, O> {
-    /**
-     * {@link Metaoptimizer}.
-     */
-    private final Metaoptimizer<L, X, Y, I, D, O> metaoptimizer;
-
-    /**
-     * {@link MetaoptimizerGroupTrainer} for which this distributed step is 
used.
-     */
-    private final MetaoptimizerGroupTrainer<L, K, V, D, ?, I, ?, ?, G, O, X, 
Y> trainer;
-
-    /**
-     * Construct instance of this class with given parameters.
-     *
-     * @param metaoptimizer Metaoptimizer.
-     * @param trainer {@link MetaoptimizerGroupTrainer} for which this 
distributed step is used.
-     */
-    public MetaoptimizerDistributedStep(Metaoptimizer<L, X, Y, I, D, O> 
metaoptimizer,
-        MetaoptimizerGroupTrainer<L, K, V, D, ?, I, ?, ?, G, O, X, Y> trainer) 
{
-        this.metaoptimizer = metaoptimizer;
-        this.trainer = trainer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteSupplier<G> remoteContextSupplier(I input, L 
locCtx) {
-        return trainer.remoteContextExtractor(input, locCtx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFunction<EntryAndContext<K, V, G>, 
ResultAndUpdates<O>> worker() {
-        IgniteFunction<X, ResultAndUpdates<Y>> dataProcessor = 
trainer.dataProcessor();
-        IgniteFunction<X, X> preprocessor = 
metaoptimizer.distributedPreprocessor();
-        IgniteFunction<Y, O> postprocessor = 
metaoptimizer.distributedPostprocessor();
-        IgniteFunction<EntryAndContext<K, V, G>, X> ctxExtractor = 
trainer.trainingLoopStepDataExtractor();
-
-        return entryAndCtx -> {
-            X apply = ctxExtractor.apply(entryAndCtx);
-            preprocessor.apply(apply);
-            ResultAndUpdates<Y> res = dataProcessor.apply(apply);
-            O postprocessRes = postprocessor.apply(res.result());
-
-            return 
ResultAndUpdates.of(postprocessRes).setUpdates(res.updates());
-        };
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteSupplier<Stream<GroupTrainerCacheKey<K>>> keys(I 
input, L locCtx) {
-        return trainer.keysToProcessInTrainingLoop(locCtx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFunction<List<O>, O> reducer() {
-        return metaoptimizer.postProcessReducer();
-    }
-}

Reply via email to