IGNITE-7437: Partition based dataset implementation

this closes #3410


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/54bac750
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/54bac750
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/54bac750

Branch: refs/heads/ignite-7485-2
Commit: 54bac750098bba32993754935e867bfbdd42ebef
Parents: eee4f17
Author: dmitrievanthony <dmitrievanth...@gmail.com>
Authored: Sun Feb 4 21:06:02 2018 +0300
Committer: Yury Babak <yba...@gridgain.com>
Committed: Sun Feb 4 21:06:02 2018 +0300

----------------------------------------------------------------------
 .../AlgorithmSpecificDatasetExample.java        | 195 ++++++++++
 .../ml/dataset/CacheBasedDatasetExample.java    |  90 +++++
 .../ml/dataset/LocalDatasetExample.java         |  84 ++++
 .../examples/ml/dataset/model/Person.java       |  58 +++
 .../examples/ml/dataset/model/package-info.java |  22 ++
 .../examples/ml/dataset/package-info.java       |  22 ++
 .../ml/preprocessing/NormalizationExample.java  | 109 ++++++
 .../examples/ml/preprocessing/package-info.java |  22 ++
 modules/ml/pom.xml                              |   7 +
 .../org/apache/ignite/ml/dataset/Dataset.java   | 213 ++++++++++
 .../ignite/ml/dataset/DatasetBuilder.java       |  49 +++
 .../ignite/ml/dataset/DatasetFactory.java       | 387 +++++++++++++++++++
 .../ml/dataset/PartitionContextBuilder.java     |  58 +++
 .../ignite/ml/dataset/PartitionDataBuilder.java |  63 +++
 .../apache/ignite/ml/dataset/UpstreamEntry.java |  53 +++
 .../dataset/impl/cache/CacheBasedDataset.java   | 168 ++++++++
 .../impl/cache/CacheBasedDatasetBuilder.java    |  95 +++++
 .../ml/dataset/impl/cache/package-info.java     |  22 ++
 .../dataset/impl/cache/util/ComputeUtils.java   | 251 ++++++++++++
 .../util/DatasetAffinityFunctionWrapper.java    |  75 ++++
 .../impl/cache/util/PartitionDataStorage.java   |  65 ++++
 .../impl/cache/util/UpstreamCursorAdapter.java  |  68 ++++
 .../dataset/impl/cache/util/package-info.java   |  22 ++
 .../ml/dataset/impl/local/LocalDataset.java     |  88 +++++
 .../dataset/impl/local/LocalDatasetBuilder.java | 137 +++++++
 .../ml/dataset/impl/local/package-info.java     |  22 ++
 .../ignite/ml/dataset/impl/package-info.java    |  22 ++
 .../apache/ignite/ml/dataset/package-info.java  |  22 ++
 .../ml/dataset/primitive/DatasetWrapper.java    |  63 +++
 .../ml/dataset/primitive/SimpleDataset.java     | 216 +++++++++++
 .../dataset/primitive/SimpleLabeledDataset.java |  39 ++
 .../builder/context/EmptyContextBuilder.java    |  39 ++
 .../primitive/builder/context/package-info.java |  22 ++
 .../builder/data/SimpleDatasetDataBuilder.java  |  76 ++++
 .../data/SimpleLabeledDatasetDataBuilder.java   |  86 +++++
 .../primitive/builder/data/package-info.java    |  22 ++
 .../dataset/primitive/builder/package-info.java |  22 ++
 .../dataset/primitive/context/EmptyContext.java |  28 ++
 .../dataset/primitive/context/package-info.java |  22 ++
 .../primitive/data/SimpleDatasetData.java       |  69 ++++
 .../data/SimpleLabeledDatasetData.java          |  79 ++++
 .../ml/dataset/primitive/data/package-info.java |  22 ++
 .../ml/dataset/primitive/package-info.java      |  28 ++
 .../ml/preprocessing/PreprocessingTrainer.java  |  41 ++
 .../NormalizationPartitionData.java             |  58 +++
 .../NormalizationPreprocessor.java              |  88 +++++
 .../normalization/NormalizationTrainer.java     |  90 +++++
 .../normalization/package-info.java             |  22 ++
 .../ignite/ml/preprocessing/package-info.java   |  22 ++
 .../org/apache/ignite/ml/IgniteMLTestSuite.java |   6 +-
 .../ignite/ml/dataset/DatasetTestSuite.java     |  45 +++
 .../cache/CacheBasedDatasetBuilderTest.java     | 107 +++++
 .../impl/cache/CacheBasedDatasetTest.java       | 353 +++++++++++++++++
 .../impl/cache/util/ComputeUtilsTest.java       | 309 +++++++++++++++
 .../DatasetAffinityFunctionWrapperTest.java     | 110 ++++++
 .../cache/util/PartitionDataStorageTest.java    |  49 +++
 .../impl/local/LocalDatasetBuilderTest.java     |  91 +++++
 .../dataset/primitive/DatasetWrapperTest.java   |  87 +++++
 .../preprocessing/PreprocessingTestSuite.java   |  35 ++
 .../NormalizationPreprocessorTest.java          |  54 +++
 .../normalization/NormalizationTrainerTest.java |  76 ++++
 61 files changed, 4964 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/examples/src/main/java/org/apache/ignite/examples/ml/dataset/AlgorithmSpecificDatasetExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/ml/dataset/AlgorithmSpecificDatasetExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/AlgorithmSpecificDatasetExample.java
new file mode 100644
index 0000000..98f85cd
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/AlgorithmSpecificDatasetExample.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.ml.dataset;
+
+import com.github.fommil.netlib.BLAS;
+import java.io.Serializable;
+import java.util.Arrays;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.examples.ml.dataset.model.Person;
+import org.apache.ignite.ml.dataset.Dataset;
+import org.apache.ignite.ml.dataset.DatasetFactory;
+import org.apache.ignite.ml.dataset.primitive.DatasetWrapper;
+import 
org.apache.ignite.ml.dataset.primitive.builder.data.SimpleLabeledDatasetDataBuilder;
+import org.apache.ignite.ml.dataset.primitive.data.SimpleLabeledDatasetData;
+
+/**
+ * Example that shows how to implement your own algorithm (gradient descent 
trainer for linear regression) which uses
+ * dataset as an underlying infrastructure.
+ *
+ * The common idea behind using algorithm specific datasets is to write a 
simple local version algorithm at first, then
+ * find operations which involves data manipulations, and finally define 
algorithm specific version of the dataset
+ * extended by introducing these new operations. As result your algorithm will 
work with extended dataset (based on
+ * {@link DatasetWrapper}) in a sequential manner.
+ *
+ * In this example we need to implement gradient descent. This is iterative 
method that involves calculation of gradient
+ * on every step. In according with the common idea we defines {@link 
AlgorithmSpecificDataset} - extended version
+ * of {@code Dataset} with {@code gradient} method. As result our gradient 
descent method looks like a simple loop where
+ * every iteration includes call of the {@code gradient} method. In the 
example we want to keep iteration number as well
+ * for logging. Iteration number cannot be recovered from the {@code upstream} 
data and we need to keep it in the custom
+ * partition {@code context} which is represented by {@link 
AlgorithmSpecificPartitionContext} class.
+ */
+public class AlgorithmSpecificDatasetExample {
+    /** Run example. */
+    public static void main(String[] args) throws Exception {
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println(">>> Algorithm Specific Dataset example 
started.");
+
+            IgniteCache<Integer, Person> persons = createCache(ignite);
+
+            // Creates a algorithm specific dataset to perform linear 
regression. Here we defines the way features and
+            // labels are extracted, and partition data and context are 
created.
+            try (AlgorithmSpecificDataset dataset = DatasetFactory.create(
+                ignite,
+                persons,
+                (upstream, upstreamSize) -> new 
AlgorithmSpecificPartitionContext(),
+                new SimpleLabeledDatasetDataBuilder<Integer, Person, 
AlgorithmSpecificPartitionContext>(
+                    (k, v) -> new double[] {v.getAge()},
+                    (k, v) -> v.getSalary(),
+                    1
+                ).andThen((data, ctx) -> {
+                    double[] features = data.getFeatures();
+                    int rows = data.getRows();
+
+                    // Makes a copy of features to supplement it by columns 
with values equal to 1.0.
+                    double[] a = new double[features.length + rows];
+
+                    for (int i = 0; i < rows; i++)
+                        a[i] = 1.0;
+
+                    System.arraycopy(features, 0, a, rows, features.length);
+
+                    return new SimpleLabeledDatasetData(a, rows, 
data.getCols() + 1, data.getLabels());
+                })
+            ).wrap(AlgorithmSpecificDataset::new)) {
+                // Trains linear regression model using gradient descent.
+                double[] linearRegressionMdl = new double[2];
+
+                for (int i = 0; i < 1000; i++) {
+                    double[] gradient = dataset.gradient(linearRegressionMdl);
+
+                    if (BLAS.getInstance().dnrm2(gradient.length, gradient, 1) 
< 1e-4)
+                        break;
+
+                    for (int j = 0; j < gradient.length; j++)
+                        linearRegressionMdl[j] -= 0.1 / persons.size() * 
gradient[j];
+                }
+
+                System.out.println("Linear Regression Model: " + 
Arrays.toString(linearRegressionMdl));
+            }
+
+            System.out.println(">>> Algorithm Specific Dataset example 
completed.");
+        }
+    }
+
+    /**
+     * Algorithm specific dataset. Extended version of {@code Dataset} with 
{@code gradient} method.
+     */
+    private static class AlgorithmSpecificDataset
+        extends DatasetWrapper<AlgorithmSpecificPartitionContext, 
SimpleLabeledDatasetData> {
+        /** BLAS (Basic Linear Algebra Subprograms) instance. */
+        private static final BLAS blas = BLAS.getInstance();
+
+        /**
+         * Constructs a new instance of dataset wrapper that delegates {@code 
compute} actions to the actual delegate.
+         *
+         * @param delegate Delegate that performs {@code compute} actions.
+         */
+        AlgorithmSpecificDataset(
+            Dataset<AlgorithmSpecificPartitionContext, 
SimpleLabeledDatasetData> delegate) {
+            super(delegate);
+        }
+
+        /** Calculate gradient. */
+        double[] gradient(double[] x) {
+            return computeWithCtx((ctx, data, partIdx) -> {
+                double[] tmp = Arrays.copyOf(data.getLabels(), data.getRows());
+                blas.dgemv("N", data.getRows(), data.getCols(), 1.0, 
data.getFeatures(),
+                    Math.max(1, data.getRows()), x, 1, -1.0, tmp, 1);
+
+                double[] res = new double[data.getCols()];
+                blas.dgemv("T", data.getRows(), data.getCols(), 1.0, 
data.getFeatures(),
+                    Math.max(1, data.getRows()), tmp, 1, 0.0, res, 1);
+
+                int iteration = ctx.getIteration();
+
+                System.out.println("Iteration " + iteration + " on partition " 
+ partIdx
+                    + " completed with local result " + Arrays.toString(res));
+
+                ctx.setIteration(iteration + 1);
+
+                return res;
+            }, this::sum);
+        }
+
+        /** Sum of two vectors. */
+        public double[] sum(double[] a, double[] b) {
+            if (a == null)
+                return b;
+
+            if (b == null)
+                return a;
+
+            blas.daxpy(a.length, 1.0, a, 1, b, 1);
+
+            return b;
+        }
+    }
+
+    /**
+     * Algorithm specific partition context which keeps iteration number.
+     */
+    private static class AlgorithmSpecificPartitionContext implements 
Serializable {
+        /** */
+        private static final long serialVersionUID = 1887368924266684044L;
+
+        /** Iteration number. */
+        private int iteration;
+
+        /** */
+        public int getIteration() {
+            return iteration;
+        }
+
+        /** */
+        public void setIteration(int iteration) {
+            this.iteration = iteration;
+        }
+    }
+
+    /** */
+    private static IgniteCache<Integer, Person> createCache(Ignite ignite) {
+        CacheConfiguration<Integer, Person> cacheConfiguration = new 
CacheConfiguration<>();
+
+        cacheConfiguration.setName("PERSONS");
+        cacheConfiguration.setAffinity(new RendezvousAffinityFunction(false, 
2));
+
+        IgniteCache<Integer, Person> persons = 
ignite.createCache(cacheConfiguration);
+
+        persons.put(1, new Person("Mike", 1, 1));
+        persons.put(2, new Person("John", 2, 2));
+        persons.put(3, new Person("George", 3, 3));
+        persons.put(4, new Person("Karl", 4, 4));
+
+        return persons;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/examples/src/main/java/org/apache/ignite/examples/ml/dataset/CacheBasedDatasetExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/ml/dataset/CacheBasedDatasetExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/CacheBasedDatasetExample.java
new file mode 100644
index 0000000..b1413ad
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/CacheBasedDatasetExample.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.ml.dataset;
+
+import java.util.Arrays;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.examples.ml.dataset.model.Person;
+import org.apache.ignite.ml.dataset.DatasetFactory;
+import org.apache.ignite.ml.dataset.primitive.SimpleDataset;
+
+/**
+ * Example that shows how to create dataset based on an existing Ignite Cache 
and then use it to calculate {@code mean}
+ * and {@code std} values as well as {@code covariance} and {@code 
correlation} matrices.
+ */
+public class CacheBasedDatasetExample {
+    /** Run example. */
+    public static void main(String[] args) throws Exception {
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println(">>> Cache Based Dataset example started.");
+
+            IgniteCache<Integer, Person> persons = createCache(ignite);
+
+            // Creates a cache based simple dataset containing features and 
providing standard dataset API.
+            try (SimpleDataset<?> dataset = DatasetFactory.createSimpleDataset(
+                ignite,
+                persons,
+                (k, v) -> new double[]{ v.getAge(), v.getSalary() },
+                2
+            )) {
+                // Calculation of the mean value. This calculation will be 
performed in map-reduce manner.
+                double[] mean = dataset.mean();
+                System.out.println("Mean \n\t" + Arrays.toString(mean));
+
+                // Calculation of the standard deviation. This calculation 
will be performed in map-reduce manner.
+                double[] std = dataset.std();
+                System.out.println("Standard deviation \n\t" + 
Arrays.toString(std));
+
+                // Calculation of the covariance matrix.  This calculation 
will be performed in map-reduce manner.
+                double[][] cov = dataset.cov();
+                System.out.println("Covariance matrix ");
+                for (double[] row : cov)
+                    System.out.println("\t" + Arrays.toString(row));
+
+                // Calculation of the correlation matrix.  This calculation 
will be performed in map-reduce manner.
+                double[][] corr = dataset.corr();
+                System.out.println("Correlation matrix ");
+                for (double[] row : corr)
+                    System.out.println("\t" + Arrays.toString(row));
+            }
+
+            System.out.println(">>> Cache Based Dataset example completed.");
+        }
+    }
+
+    /** */
+    private static IgniteCache<Integer, Person> createCache(Ignite ignite) {
+        CacheConfiguration<Integer, Person> cacheConfiguration = new 
CacheConfiguration<>();
+
+        cacheConfiguration.setName("PERSONS");
+        cacheConfiguration.setAffinity(new RendezvousAffinityFunction(false, 
2));
+
+        IgniteCache<Integer, Person> persons = 
ignite.createCache(cacheConfiguration);
+
+        persons.put(1, new Person("Mike", 42, 10000));
+        persons.put(2, new Person("John", 32, 64000));
+        persons.put(3, new Person("George", 53, 120000));
+        persons.put(4, new Person("Karl", 24, 70000));
+
+        return persons;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/examples/src/main/java/org/apache/ignite/examples/ml/dataset/LocalDatasetExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/ml/dataset/LocalDatasetExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/LocalDatasetExample.java
new file mode 100644
index 0000000..af14836
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/LocalDatasetExample.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.ml.dataset;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.examples.ml.dataset.model.Person;
+import org.apache.ignite.ml.dataset.DatasetFactory;
+import org.apache.ignite.ml.dataset.primitive.SimpleDataset;
+
+/**
+ * Example that shows how to create dataset based on an existing local storage 
and then use it to calculate {@code mean}
+ * and {@code std} values as well as {@code covariance} and {@code 
correlation} matrices.
+ */
+public class LocalDatasetExample {
+    /** Run example. */
+    public static void main(String[] args) throws Exception {
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println(">>> Local Dataset example started.");
+
+            Map<Integer, Person> persons = createCache(ignite);
+
+            // Creates a local simple dataset containing features and 
providing standard dataset API.
+            try (SimpleDataset<?> dataset = DatasetFactory.createSimpleDataset(
+                persons,
+                2,
+                (k, v) -> new double[]{ v.getAge(), v.getSalary() },
+                2
+            )) {
+                // Calculation of the mean value. This calculation will be 
performed in map-reduce manner.
+                double[] mean = dataset.mean();
+                System.out.println("Mean \n\t" + Arrays.toString(mean));
+
+                // Calculation of the standard deviation. This calculation 
will be performed in map-reduce manner.
+                double[] std = dataset.std();
+                System.out.println("Standard deviation \n\t" + 
Arrays.toString(std));
+
+                // Calculation of the covariance matrix.  This calculation 
will be performed in map-reduce manner.
+                double[][] cov = dataset.cov();
+                System.out.println("Covariance matrix ");
+                for (double[] row : cov)
+                    System.out.println("\t" + Arrays.toString(row));
+
+                // Calculation of the correlation matrix.  This calculation 
will be performed in map-reduce manner.
+                double[][] corr = dataset.corr();
+                System.out.println("Correlation matrix ");
+                for (double[] row : corr)
+                    System.out.println("\t" + Arrays.toString(row));
+            }
+
+            System.out.println(">>> Local Dataset example completed.");
+        }
+    }
+
+    /** */
+    private static Map<Integer, Person> createCache(Ignite ignite) {
+        Map<Integer, Person> persons = new HashMap<>();
+
+        persons.put(1, new Person("Mike", 42, 10000));
+        persons.put(2, new Person("John", 32, 64000));
+        persons.put(3, new Person("George", 53, 120000));
+        persons.put(4, new Person("Karl", 24, 70000));
+
+        return persons;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/Person.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/Person.java
 
b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/Person.java
new file mode 100644
index 0000000..3770de8
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/Person.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.ml.dataset.model;
+
+/** Person model. */
+public class Person {
+    /** Name. */
+    private final String name;
+
+    /** Age. */
+    private final double age;
+
+    /** Salary. */
+    private final double salary;
+
+    /**
+     * Constructs a new instance of person.
+     *
+     * @param name Name.
+     * @param age Age.
+     * @param salary Salary.
+     */
+    public Person(String name, double age, double salary) {
+        this.name = name;
+        this.age = age;
+        this.salary = salary;
+    }
+
+    /** */
+    public String getName() {
+        return name;
+    }
+
+    /** */
+    public double getAge() {
+        return age;
+    }
+
+    /** */
+    public double getSalary() {
+        return salary;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/package-info.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/package-info.java
 
b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/package-info.java
new file mode 100644
index 0000000..86df332
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Models used in machine learning dataset examples.
+ */
+package org.apache.ignite.examples.ml.dataset.model;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/examples/src/main/java/org/apache/ignite/examples/ml/dataset/package-info.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/ml/dataset/package-info.java
 
b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/package-info.java
new file mode 100644
index 0000000..2d0fc1d
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Machine learning dataset examples.
+ */
+package org.apache.ignite.examples.ml.dataset;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/NormalizationExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/NormalizationExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/NormalizationExample.java
new file mode 100644
index 0000000..008b4ca
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/NormalizationExample.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.ml.preprocessing;
+
+import java.util.Arrays;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.examples.ml.dataset.model.Person;
+import org.apache.ignite.ml.dataset.DatasetBuilder;
+import org.apache.ignite.ml.dataset.DatasetFactory;
+import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDatasetBuilder;
+import org.apache.ignite.ml.dataset.primitive.SimpleDataset;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import 
org.apache.ignite.ml.preprocessing.normalization.NormalizationPreprocessor;
+import org.apache.ignite.ml.preprocessing.normalization.NormalizationTrainer;
+
+/**
+ * Example that shows how to use normalization preprocessor to normalize data.
+ *
+ * Machine learning preprocessors are built as a chain. Most often a first 
preprocessor is a feature extractor as shown
+ * in this example. The second preprocessor here is a normalization 
preprocessor which is built on top of the feature
+ * extractor and represents a chain of itself and the underlying feature 
extractor.
+ */
+public class NormalizationExample {
+    /** Run example. */
+    public static void main(String[] args) throws Exception {
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println(">>> Normalization example started.");
+
+            IgniteCache<Integer, Person> persons = createCache(ignite);
+
+            DatasetBuilder<Integer, Person> builder = new 
CacheBasedDatasetBuilder<>(ignite, persons);
+
+            // Defines first preprocessor that extracts features from an 
upstream data.
+            IgniteBiFunction<Integer, Person, double[]> featureExtractor = (k, 
v) -> new double[] {
+                v.getAge(),
+                v.getSalary()
+            };
+
+            // Defines second preprocessor that normalizes features.
+            NormalizationPreprocessor<Integer, Person> preprocessor = new 
NormalizationTrainer<Integer, Person>()
+                .fit(builder, featureExtractor, 2);
+
+            // Creates a cache based simple dataset containing features and 
providing standard dataset API.
+            try (SimpleDataset<?> dataset = DatasetFactory.createSimpleDataset(
+                builder,
+                preprocessor,
+                2
+            )) {
+                // Calculation of the mean value. This calculation will be 
performed in map-reduce manner.
+                double[] mean = dataset.mean();
+                System.out.println("Mean \n\t" + Arrays.toString(mean));
+
+                // Calculation of the standard deviation. This calculation 
will be performed in map-reduce manner.
+                double[] std = dataset.std();
+                System.out.println("Standard deviation \n\t" + 
Arrays.toString(std));
+
+                // Calculation of the covariance matrix.  This calculation 
will be performed in map-reduce manner.
+                double[][] cov = dataset.cov();
+                System.out.println("Covariance matrix ");
+                for (double[] row : cov)
+                    System.out.println("\t" + Arrays.toString(row));
+
+                // Calculation of the correlation matrix.  This calculation 
will be performed in map-reduce manner.
+                double[][] corr = dataset.corr();
+                System.out.println("Correlation matrix ");
+                for (double[] row : corr)
+                    System.out.println("\t" + Arrays.toString(row));
+            }
+
+            System.out.println(">>> Normalization example completed.");
+        }
+    }
+
+    /** */
+    private static IgniteCache<Integer, Person> createCache(Ignite ignite) {
+        CacheConfiguration<Integer, Person> cacheConfiguration = new 
CacheConfiguration<>();
+
+        cacheConfiguration.setName("PERSONS");
+        cacheConfiguration.setAffinity(new RendezvousAffinityFunction(false, 
2));
+
+        IgniteCache<Integer, Person> persons = 
ignite.createCache(cacheConfiguration);
+
+        persons.put(1, new Person("Mike", 42, 10000));
+        persons.put(2, new Person("John", 32, 64000));
+        persons.put(3, new Person("George", 53, 120000));
+        persons.put(4, new Person("Karl", 24, 70000));
+
+        return persons;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/package-info.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/package-info.java
 
b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/package-info.java
new file mode 100644
index 0000000..164f14d
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Machine learning preprocessing examples.
+ */
+package org.apache.ignite.examples.ml.preprocessing;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/pom.xml
----------------------------------------------------------------------
diff --git a/modules/ml/pom.xml b/modules/ml/pom.xml
index 2e64582..f8d56d7 100644
--- a/modules/ml/pom.xml
+++ b/modules/ml/pom.xml
@@ -98,6 +98,13 @@
             <artifactId>SparseBitSet</artifactId>
             <version>1.0</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <profiles>

http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/Dataset.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/Dataset.java 
b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/Dataset.java
new file mode 100644
index 0000000..24a2063
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/Dataset.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset;
+
+import java.io.Serializable;
+import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDataset;
+import org.apache.ignite.ml.dataset.impl.local.LocalDataset;
+import org.apache.ignite.ml.math.functions.IgniteBiConsumer;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import org.apache.ignite.ml.math.functions.IgniteBinaryOperator;
+import org.apache.ignite.ml.math.functions.IgniteConsumer;
+import org.apache.ignite.ml.math.functions.IgniteFunction;
+import org.apache.ignite.ml.math.functions.IgniteTriConsumer;
+import org.apache.ignite.ml.math.functions.IgniteTriFunction;
+
+/**
+ * A dataset providing an API that allows to perform generic computations on a 
distributed data represented as a set of
+ * partitions distributed across a cluster or placed locally. Every partition 
contains a {@code context} (reliably
+ * stored segment) and {@code data} (unreliably stored segment, which can be 
recovered from an upstream data and a
+ * {@code context} if needed). Computations are performed in a {@code 
MapReduce} manner, what allows to reduce a
+ * network traffic for most of the machine learning algorithms.
+ *
+ * <p>Dataset functionality allows to implement iterative machine learning 
algorithms via introducing computation
+ * context. In case iterative algorithm requires to maintain a state available 
and updatable on every iteration this
+ * state can be stored in the {@code context} of the partition and after that 
it will be available in further
+ * computations even if the Ignite Cache partition will be moved to another 
node because of node failure or rebalancing.
+ *
+ * <p>Partition {@code context} should be {@link Serializable} to be saved in 
Ignite Cache. Partition {@code data}
+ * should be {@link AutoCloseable} to allow system to clean up correspondent 
resources when partition {@code data} is
+ * not needed anymore.
+ *
+ * @param <C> Type of a partition {@code context}.
+ * @param <D> Type of a partition {@code data}.
+ *
+ * @see CacheBasedDataset
+ * @see LocalDataset
+ * @see DatasetFactory
+ */
+public interface Dataset<C extends Serializable, D extends AutoCloseable> 
extends AutoCloseable {
+    /**
+     * Applies the specified {@code map} function to every partition {@code 
data}, {@code context} and partition
+     * index in the dataset and then reduces {@code map} results to final 
result by using the {@code reduce} function.
+     *
+     * @param map Function applied to every partition {@code data}, {@code 
context} and partition index.
+     * @param reduce Function applied to results of {@code map} to get final 
result.
+     * @param identity Identity.
+     * @param <R> Type of a result.
+     * @return Final result.
+     */
+    public <R> R computeWithCtx(IgniteTriFunction<C, D, Integer, R> map, 
IgniteBinaryOperator<R> reduce, R identity);
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code 
data} and partition index in the dataset
+     * and then reduces {@code map} results to final result by using the 
{@code reduce} function.
+     *
+     * @param map Function applied to every partition {@code data} and 
partition index.
+     * @param reduce Function applied to results of {@code map} to get final 
result.
+     * @param identity Identity.
+     * @param <R> Type of a result.
+     * @return Final result.
+     */
+    public <R> R compute(IgniteBiFunction<D, Integer, R> map, 
IgniteBinaryOperator<R> reduce, R identity);
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code 
data}, {@code context} and partition
+     * index in the dataset and then reduces {@code map} results to final 
result by using the {@code reduce} function.
+     *
+     * @param map Function applied to every partition {@code data}, {@code 
context} and partition index.
+     * @param reduce Function applied to results of {@code map} to get final 
result.
+     * @param <R> Type of a result.
+     * @return Final result.
+     */
+    default public <R> R computeWithCtx(IgniteTriFunction<C, D, Integer, R> 
map, IgniteBinaryOperator<R> reduce) {
+        return computeWithCtx(map, reduce, null);
+    }
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code 
data} and partition index in the dataset
+     * and then reduces {@code map} results to final result by using the 
{@code reduce} function.
+     *
+     * @param map Function applied to every partition {@code data} and 
partition index.
+     * @param reduce Function applied to results of {@code map} to get final 
result.
+     * @param <R> Type of a result.
+     * @return Final result.
+     */
+    default public <R> R compute(IgniteBiFunction<D, Integer, R> map, 
IgniteBinaryOperator<R> reduce) {
+        return compute(map, reduce, null);
+    }
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code 
data} and {@code context} in the dataset
+     * and then reduces {@code map} results to final result by using the 
{@code reduce} function.
+     *
+     * @param map Function applied to every partition {@code data} and {@code 
context}.
+     * @param reduce Function applied to results of {@code map} to get final 
result.
+     * @param identity Identity.
+     * @param <R> Type of a result.
+     * @return Final result.
+     */
+    default public <R> R computeWithCtx(IgniteBiFunction<C, D, R> map, 
IgniteBinaryOperator<R> reduce, R identity) {
+        return computeWithCtx((ctx, data, partIdx) -> map.apply(ctx, data), 
reduce, identity);
+    }
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code 
data} in the dataset and then reduces
+     * {@code map} results to final result by using the {@code reduce} 
function.
+     *
+     * @param map Function applied to every partition {@code data}.
+     * @param reduce Function applied to results of {@code map} to get final 
result.
+     * @param identity Identity.
+     * @param <R> Type of a result.
+     * @return Final result.
+     */
+    default public <R> R compute(IgniteFunction<D, R> map, 
IgniteBinaryOperator<R> reduce, R identity) {
+        return compute((data, partIdx) -> map.apply(data), reduce, identity);
+    }
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code 
data} and {@code context} in the dataset
+     * and then reduces {@code map} results to final result by using the 
{@code reduce} function.
+     *
+     * @param map Function applied to every partition {@code data} and {@code 
context}.
+     * @param reduce Function applied to results of {@code map} to get final 
result.
+     * @param <R> Type of a result.
+     * @return Final result.
+     */
+    default public <R> R computeWithCtx(IgniteBiFunction<C, D, R> map, 
IgniteBinaryOperator<R> reduce) {
+        return computeWithCtx((ctx, data, partIdx) -> map.apply(ctx, data), 
reduce);
+    }
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code 
data} in the dataset and then reduces
+     * {@code map} results to final result by using the {@code reduce} 
function.
+     *
+     * @param map Function applied to every partition {@code data}.
+     * @param reduce Function applied to results of {@code map} to get final 
result.
+     * @param <R> Type of a result.
+     * @return Final result.
+     */
+    default public <R> R compute(IgniteFunction<D, R> map, 
IgniteBinaryOperator<R> reduce) {
+        return compute((data, partIdx) -> map.apply(data), reduce);
+    }
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code 
data}, {@code context} and partition
+     * index in the dataset.
+     *
+     * @param map Function applied to every partition {@code data}, {@code 
context} and partition index.
+     */
+    default public void computeWithCtx(IgniteTriConsumer<C, D, Integer> map) {
+        computeWithCtx((ctx, data, partIdx) -> {
+            map.accept(ctx, data, partIdx);
+            return null;
+        }, (a, b) -> null);
+    }
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code 
data} in the dataset and partition index.
+     *
+     * @param map Function applied to every partition {@code data} and 
partition index.
+     */
+    default public void compute(IgniteBiConsumer<D, Integer> map) {
+        compute((data, partIdx) -> {
+            map.accept(data, partIdx);
+            return null;
+        }, (a, b) -> null);
+    }
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code 
data} and {@code context} in the dataset.
+     *
+     * @param map Function applied to every partition {@code data} and {@code 
context}.
+     */
+    default public void computeWithCtx(IgniteBiConsumer<C, D> map) {
+        computeWithCtx((ctx, data, partIdx) -> map.accept(ctx, data));
+    }
+
+    /**
+     * Applies the specified {@code map} function to every partition {@code 
data} in the dataset.
+     *
+     * @param map Function applied to every partition {@code data}.
+     */
+    default public void compute(IgniteConsumer<D> map) {
+        compute((data, partIdx) -> map.accept(data));
+    }
+
+    /**
+     * Wraps this dataset into the specified wrapper to introduce new 
functionality based on {@code compute} and
+     * {@code computeWithCtx} methods.
+     *
+     * @param wrapper Dataset wrapper.
+     * @param <I> Type of a new wrapped dataset.
+     * @return New wrapped dataset.
+     */
+    default public <I extends Dataset<C ,D>> I wrap(IgniteFunction<Dataset<C, 
D>, I> wrapper) {
+        return wrapper.apply(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetBuilder.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetBuilder.java 
b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetBuilder.java
new file mode 100644
index 0000000..a6757ff
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetBuilder.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset;
+
+import java.io.Serializable;
+import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDatasetBuilder;
+import org.apache.ignite.ml.dataset.impl.local.LocalDatasetBuilder;
+
+/**
+ * A builder constructing instances of a {@link Dataset}. Implementations of 
this interface encapsulate logic of
+ * building specific datasets such as allocation required data structures and 
initialization of {@code context} part of
+ * partitions.
+ *
+ * @param <K> Type of a key in {@code upstream} data.
+ * @param <V> Type of a value in {@code upstream} data.
+ *
+ * @see CacheBasedDatasetBuilder
+ * @see LocalDatasetBuilder
+ * @see Dataset
+ */
+public interface DatasetBuilder<K, V> {
+    /**
+     * Constructs a new instance of {@link Dataset} that includes allocation 
required data structures and
+     * initialization of {@code context} part of partitions.
+     *
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param partDataBuilder Partition {@code data} builder.
+     * @param <C> Type of a partition {@code context}.
+     * @param <D> Type of a partition {@code data}.
+     * @return Dataset.
+     */
+    public <C extends Serializable, D extends AutoCloseable> Dataset<C, D> 
build(
+        PartitionContextBuilder<K, V, C> partCtxBuilder, 
PartitionDataBuilder<K, V, C, D> partDataBuilder);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetFactory.java 
b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetFactory.java
new file mode 100644
index 0000000..af44a8a
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetFactory.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDatasetBuilder;
+import org.apache.ignite.ml.dataset.impl.local.LocalDatasetBuilder;
+import org.apache.ignite.ml.dataset.primitive.SimpleDataset;
+import org.apache.ignite.ml.dataset.primitive.SimpleLabeledDataset;
+import 
org.apache.ignite.ml.dataset.primitive.builder.context.EmptyContextBuilder;
+import 
org.apache.ignite.ml.dataset.primitive.builder.data.SimpleDatasetDataBuilder;
+import 
org.apache.ignite.ml.dataset.primitive.builder.data.SimpleLabeledDatasetDataBuilder;
+import org.apache.ignite.ml.dataset.primitive.context.EmptyContext;
+import org.apache.ignite.ml.dataset.primitive.data.SimpleDatasetData;
+import org.apache.ignite.ml.dataset.primitive.data.SimpleLabeledDatasetData;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+
+/**
+ * Factory providing a client facing API that allows to construct basic and 
the most frequently used types of dataset.
+ *
+ *
+ * <p>Dataset construction is based on three major concepts: a partition 
{@code upstream}, {@code context} and
+ * {@code data}. A partition {@code upstream} is a data source, which assumed 
to be available all the time regardless
+ * node failures and rebalancing events. A partition {@code context} is a part 
of a partition maintained during the
+ * whole computation process and stored in a reliable storage so that a {@code 
context} is staying available and
+ * consistent regardless node failures and rebalancing events as well as an 
{@code upstream}. A partition {@code data}
+ * is a part of partition maintained during a computation process in 
unreliable local storage such as heap, off-heap or
+ * GPU memory on the node where current computation is performed, so that 
partition {@code data} can be lost as result
+ * of node failure or rebalancing, but it can be restored from an {@code 
upstream} and a partition {@code context}.
+ *
+ * <p>A partition {@code context} and {@code data} are built on top of an 
{@code upstream} by using specified
+ * builders: {@link PartitionContextBuilder} and {@link PartitionDataBuilder} 
correspondingly. To build a generic
+ * dataset the following approach is used:
+ *
+ * <code>
+ * {@code
+ * Dataset<C, D> dataset = DatasetFactory.create(
+ *     ignite,
+ *     cache,
+ *     partitionContextBuilder,
+ *     partitionDataBuilder
+ * );
+ * }
+ * </code>
+ *
+ * <p>As well as the generic building method {@code create} this factory 
provides methods that allow to create a
+ * specific dataset types such as method {@code createSimpleDataset} to create 
{@link SimpleDataset} and method
+ * {@code createSimpleLabeledDataset} to create {@link SimpleLabeledDataset}.
+ *
+ * @see Dataset
+ * @see PartitionContextBuilder
+ * @see PartitionDataBuilder
+ */
+public class DatasetFactory {
+    /**
+     * Creates a new instance of distributed dataset using the specified 
{@code partCtxBuilder} and
+     * {@code partDataBuilder}. This is the generic methods that allows to 
create any Ignite Cache based datasets with
+     * any desired partition {@code context} and {@code data}.
+     *
+     * @param datasetBuilder Dataset builder.
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param partDataBuilder Partition {@code data} builder.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> ype of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     * @param <D> Type of a partition {@code data}.
+     * @return Dataset.
+     */
+    public static <K, V, C extends Serializable, D extends AutoCloseable> 
Dataset<C, D> create(
+        DatasetBuilder<K, V> datasetBuilder, PartitionContextBuilder<K, V, C> 
partCtxBuilder,
+        PartitionDataBuilder<K, V, C, D> partDataBuilder) {
+        return datasetBuilder.build(partCtxBuilder, partDataBuilder);
+    }
+    /**
+     * Creates a new instance of distributed dataset using the specified 
{@code partCtxBuilder} and
+     * {@code partDataBuilder}. This is the generic methods that allows to 
create any Ignite Cache based datasets with
+     * any desired partition {@code context} and {@code data}.
+     *
+     * @param ignite Ignite instance.
+     * @param upstreamCache Ignite Cache with {@code upstream} data.
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param partDataBuilder Partition {@code data} builder.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     * @param <D> Type of a partition {@code data}.
+     * @return Dataset.
+     */
+    public static <K, V, C extends Serializable, D extends AutoCloseable> 
Dataset<C, D> create(
+        Ignite ignite, IgniteCache<K, V> upstreamCache, 
PartitionContextBuilder<K, V, C> partCtxBuilder,
+        PartitionDataBuilder<K, V, C, D> partDataBuilder) {
+        return create(new CacheBasedDatasetBuilder<>(ignite, upstreamCache), 
partCtxBuilder, partDataBuilder);
+    }
+
+    /**
+     * Creates a new instance of distributed {@link SimpleDataset} using the 
specified {@code partCtxBuilder} and
+     * {@code featureExtractor}. This methods determines partition {@code 
data} to be {@link SimpleDatasetData}, but
+     * allows to use any desired type of partition {@code context}.
+     *
+     * @param datasetBuilder Dataset builder.
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param featureExtractor Feature extractor used to extract features and 
build {@link SimpleDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     * @return Dataset.
+     */
+    public static <K, V, C extends Serializable> SimpleDataset<C> 
createSimpleDataset(
+        DatasetBuilder<K, V> datasetBuilder, PartitionContextBuilder<K, V, C> 
partCtxBuilder,
+        IgniteBiFunction<K, V, double[]> featureExtractor, int cols) {
+        return create(
+            datasetBuilder,
+            partCtxBuilder,
+            new SimpleDatasetDataBuilder<>(featureExtractor, cols)
+        ).wrap(SimpleDataset::new);
+    }
+
+    /**
+     * Creates a new instance of distributed {@link SimpleDataset} using the 
specified {@code partCtxBuilder} and
+     * {@code featureExtractor}. This methods determines partition {@code 
data} to be {@link SimpleDatasetData}, but
+     * allows to use any desired type of partition {@code context}.
+     *
+     * @param ignite Ignite instance.
+     * @param upstreamCache Ignite Cache with {@code upstream} data.
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param featureExtractor Feature extractor used to extract features and 
build {@link SimpleDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     * @return Dataset.
+     */
+    public static <K, V, C extends Serializable> SimpleDataset<C> 
createSimpleDataset(Ignite ignite,
+        IgniteCache<K, V> upstreamCache, PartitionContextBuilder<K, V, C> 
partCtxBuilder,
+        IgniteBiFunction<K, V, double[]> featureExtractor, int cols) {
+        return createSimpleDataset(new CacheBasedDatasetBuilder<>(ignite, 
upstreamCache), partCtxBuilder,
+            featureExtractor, cols);
+    }
+
+    /**
+     * Creates a new instance of distributed {@link SimpleLabeledDataset} 
using the specified {@code partCtxBuilder},
+     * {@code featureExtractor} and {@code lbExtractor}. This method 
determines partition {@code data} to be
+     * {@link SimpleLabeledDatasetData}, but allows to use any desired type of 
partition {@code context}.
+     *
+     * @param datasetBuilder Dataset builder.
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param featureExtractor Feature extractor used to extract features and 
build {@link SimpleLabeledDatasetData}.
+     * @param lbExtractor Label extractor used to extract labels and buikd 
{@link SimpleLabeledDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     * @return Dataset.
+     */
+    public static <K, V, C extends Serializable> SimpleLabeledDataset<C> 
createSimpleLabeledDataset(
+        DatasetBuilder<K, V> datasetBuilder, PartitionContextBuilder<K, V, C> 
partCtxBuilder,
+        IgniteBiFunction<K, V, double[]> featureExtractor, IgniteBiFunction<K, 
V, Double> lbExtractor, int cols) {
+        return create(
+            datasetBuilder,
+            partCtxBuilder,
+            new SimpleLabeledDatasetDataBuilder<>(featureExtractor, 
lbExtractor, cols)
+        ).wrap(SimpleLabeledDataset::new);
+    }
+
+    /**
+     * Creates a new instance of distributed {@link SimpleLabeledDataset} 
using the specified {@code partCtxBuilder},
+     * {@code featureExtractor} and {@code lbExtractor}. This method 
determines partition {@code data} to be
+     * {@link SimpleLabeledDatasetData}, but allows to use any desired type of 
partition {@code context}.
+     *
+     * @param ignite Ignite instance.
+     * @param upstreamCache Ignite Cache with {@code upstream} data.
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param featureExtractor Feature extractor used to extract features and 
build {@link SimpleLabeledDatasetData}.
+     * @param lbExtractor Label extractor used to extract labels and buikd 
{@link SimpleLabeledDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     * @return Dataset.
+     */
+    public static <K, V, C extends Serializable> SimpleLabeledDataset<C> 
createSimpleLabeledDataset(Ignite ignite,
+        IgniteCache<K, V> upstreamCache, PartitionContextBuilder<K, V, C> 
partCtxBuilder,
+        IgniteBiFunction<K, V, double[]> featureExtractor, IgniteBiFunction<K, 
V, Double> lbExtractor, int cols) {
+        return createSimpleLabeledDataset(new 
CacheBasedDatasetBuilder<>(ignite, upstreamCache), partCtxBuilder,
+            featureExtractor, lbExtractor, cols);
+    }
+
+    /**
+     * Creates a new instance of distributed {@link SimpleDataset} using the 
specified {@code featureExtractor}. This
+     * methods determines partition {@code context} to be {@link EmptyContext} 
and partition {@code data} to be
+     * {@link SimpleDatasetData}.
+     *
+     * @param datasetBuilder Dataset builder.
+     * @param featureExtractor Feature extractor used to extract features and 
build {@link SimpleDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @return Dataset.
+     */
+    public static <K, V> SimpleDataset<EmptyContext> 
createSimpleDataset(DatasetBuilder<K, V> datasetBuilder,
+        IgniteBiFunction<K, V, double[]> featureExtractor, int cols) {
+        return createSimpleDataset(datasetBuilder, new 
EmptyContextBuilder<>(), featureExtractor, cols);
+    }
+
+    /**
+     * Creates a new instance of distributed {@link SimpleDataset} using the 
specified {@code featureExtractor}. This
+     * methods determines partition {@code context} to be {@link EmptyContext} 
and partition {@code data} to be
+     * {@link SimpleDatasetData}.
+     *
+     * @param ignite Ignite instance.
+     * @param upstreamCache Ignite Cache with {@code upstream} data.
+     * @param featureExtractor Feature extractor used to extract features and 
build {@link SimpleDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @return Dataset.
+     */
+    public static <K, V> SimpleDataset<EmptyContext> 
createSimpleDataset(Ignite ignite, IgniteCache<K, V> upstreamCache,
+        IgniteBiFunction<K, V, double[]> featureExtractor, int cols) {
+        return createSimpleDataset(new CacheBasedDatasetBuilder<>(ignite, 
upstreamCache), featureExtractor, cols);
+    }
+
+    /**
+     * Creates a new instance of distributed {@link SimpleLabeledDataset} 
using the specified {@code featureExtractor}
+     * and {@code lbExtractor}. This methods determines partition {@code 
context} to be {@link EmptyContext} and
+     * partition {@code data} to be {@link SimpleLabeledDatasetData}.
+     *
+     * @param datasetBuilder Dataset builder.
+     * @param featureExtractor Feature extractor used to extract features and 
build {@link SimpleLabeledDatasetData}.
+     * @param lbExtractor Label extractor used to extract labels and buikd 
{@link SimpleLabeledDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @return Dataset.
+     */
+    public static <K, V> SimpleLabeledDataset<EmptyContext> 
createSimpleLabeledDataset(
+        DatasetBuilder<K, V> datasetBuilder, IgniteBiFunction<K, V, double[]> 
featureExtractor,
+        IgniteBiFunction<K, V, Double> lbExtractor, int cols) {
+        return createSimpleLabeledDataset(datasetBuilder, new 
EmptyContextBuilder<>(), featureExtractor, lbExtractor,
+            cols);
+    }
+
+    /**
+     * Creates a new instance of distributed {@link SimpleLabeledDataset} 
using the specified {@code featureExtractor}
+     * and {@code lbExtractor}. This methods determines partition {@code 
context} to be {@link EmptyContext} and
+     * partition {@code data} to be {@link SimpleLabeledDatasetData}.
+     *
+     * @param ignite Ignite instance.
+     * @param upstreamCache Ignite Cache with {@code upstream} data.
+     * @param featureExtractor Feature extractor used to extract features and 
build {@link SimpleLabeledDatasetData}.
+     * @param lbExtractor Label extractor used to extract labels and buikd 
{@link SimpleLabeledDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @return Dataset.
+     */
+    public static <K, V> SimpleLabeledDataset<EmptyContext> 
createSimpleLabeledDataset(Ignite ignite,
+        IgniteCache<K, V> upstreamCache, IgniteBiFunction<K, V, double[]> 
featureExtractor,
+        IgniteBiFunction<K, V, Double> lbExtractor, int cols) {
+        return createSimpleLabeledDataset(new 
CacheBasedDatasetBuilder<>(ignite, upstreamCache), featureExtractor,
+            lbExtractor, cols);
+    }
+
+    /**
+     * Creates a new instance of local dataset using the specified {@code 
partCtxBuilder} and {@code partDataBuilder}.
+     * This is the generic methods that allows to create any Ignite Cache 
based datasets with any desired partition
+     * {@code context} and {@code data}.
+     *
+     * @param upstreamMap {@code Map} with {@code upstream} data.
+     * @param partitions Number of partitions {@code upstream} {@code Map} 
will be divided on.
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param partDataBuilder Partition {@code data} builder.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     * @param <D> Type of a partition {@code data}.
+     * @return Dataset.
+     */
+    public static <K, V, C extends Serializable, D extends AutoCloseable> 
Dataset<C, D> create(
+        Map<K, V> upstreamMap, int partitions, PartitionContextBuilder<K, V, 
C> partCtxBuilder,
+        PartitionDataBuilder<K, V, C, D> partDataBuilder) {
+        return create(new LocalDatasetBuilder<>(upstreamMap, partitions), 
partCtxBuilder, partDataBuilder);
+    }
+
+    /**
+     * Creates a new instance of local {@link SimpleDataset} using the 
specified {@code partCtxBuilder} and
+     * {@code featureExtractor}. This methods determines partition {@code 
data} to be {@link SimpleDatasetData}, but
+     * allows to use any desired type of partition {@code context}.
+     *
+     * @param upstreamMap {@code Map} with {@code upstream} data.
+     * @param partitions Number of partitions {@code upstream} {@code Map} 
will be divided on.
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param featureExtractor Feature extractor used to extract features and 
build {@link SimpleDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     * @return Dataset.
+     */
+    public static <K, V, C extends Serializable> SimpleDataset<C> 
createSimpleDataset(Map<K, V> upstreamMap,
+        int partitions, PartitionContextBuilder<K, V, C> partCtxBuilder,
+        IgniteBiFunction<K, V, double[]> featureExtractor, int cols) {
+        return createSimpleDataset(new LocalDatasetBuilder<>(upstreamMap, 
partitions), partCtxBuilder, featureExtractor,
+            cols);
+    }
+
+    /**
+     * Creates a new instance of local {@link SimpleLabeledDataset} using the 
specified {@code partCtxBuilder},
+     * {@code featureExtractor} and {@code lbExtractor}. This method 
determines partition {@code data} to be
+     * {@link SimpleLabeledDatasetData}, but allows to use any desired type of 
partition {@code context}.
+     *
+     * @param upstreamMap {@code Map} with {@code upstream} data.
+     * @param partitions Number of partitions {@code upstream} {@code Map} 
will be divided on.
+     * @param partCtxBuilder Partition {@code context} builder.
+     * @param featureExtractor Feature extractor used to extract features and 
build {@link SimpleLabeledDatasetData}.
+     * @param lbExtractor Label extractor used to extract labels and buikd 
{@link SimpleLabeledDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @param <C> Type of a partition {@code context}.
+     * @return Dataset.
+     */
+    public static <K, V, C extends Serializable> SimpleLabeledDataset<C> 
createSimpleLabeledDataset(
+        Map<K, V> upstreamMap, int partitions, PartitionContextBuilder<K, V, 
C> partCtxBuilder,
+        IgniteBiFunction<K, V, double[]> featureExtractor, IgniteBiFunction<K, 
V, Double> lbExtractor, int cols) {
+        return createSimpleLabeledDataset(new 
LocalDatasetBuilder<>(upstreamMap, partitions), partCtxBuilder,
+            featureExtractor, lbExtractor, cols);
+    }
+
+    /**
+     * Creates a new instance of local {@link SimpleDataset} using the 
specified {@code featureExtractor}. This
+     * methods determines partition {@code context} to be {@link EmptyContext} 
and partition {@code data} to be
+     * {@link SimpleDatasetData}.
+     *
+     * @param upstreamMap {@code Map} with {@code upstream} data.
+     * @param partitions Number of partitions {@code upstream} {@code Map} 
will be divided on.
+     * @param featureExtractor Feature extractor used to extract features and 
build {@link SimpleDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @return Dataset.
+     */
+    public static <K, V> SimpleDataset<EmptyContext> 
createSimpleDataset(Map<K, V> upstreamMap, int partitions,
+        IgniteBiFunction<K, V, double[]> featureExtractor, int cols) {
+        return createSimpleDataset(new LocalDatasetBuilder<>(upstreamMap, 
partitions), featureExtractor, cols);
+    }
+
+    /**
+     * Creates a new instance of local {@link SimpleLabeledDataset} using the 
specified {@code featureExtractor}
+     * and {@code lbExtractor}. This methods determines partition {@code 
context} to be {@link EmptyContext} and
+     * partition {@code data} to be {@link SimpleLabeledDatasetData}.
+     *
+     * @param upstreamMap {@code Map} with {@code upstream} data.
+     * @param partitions Number of partitions {@code upstream} {@code Map} 
will be divided on.
+     * @param featureExtractor Feature extractor used to extract features and 
build {@link SimpleLabeledDatasetData}.
+     * @param lbExtractor Label extractor used to extract labels and build 
{@link SimpleLabeledDatasetData}.
+     * @param cols Number of columns (features) will be extracted.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @return Dataset.
+     */
+    public static <K, V> SimpleLabeledDataset<EmptyContext> 
createSimpleLabeledDataset(Map<K, V> upstreamMap,
+        int partitions, IgniteBiFunction<K, V, double[]> featureExtractor, 
IgniteBiFunction<K, V, Double> lbExtractor,
+        int cols) {
+        return createSimpleLabeledDataset(new 
LocalDatasetBuilder<>(upstreamMap, partitions), featureExtractor,
+            lbExtractor, cols);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionContextBuilder.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionContextBuilder.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionContextBuilder.java
new file mode 100644
index 0000000..21c9907
--- /dev/null
+++ 
b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionContextBuilder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import 
org.apache.ignite.ml.dataset.primitive.builder.context.EmptyContextBuilder;
+import org.apache.ignite.ml.math.functions.IgniteFunction;
+
+/**
+ * Builder that accepts a partition {@code upstream} data and makes partition 
{@code context}. This builder is used to
+ * build a partition {@code context} and assumed to be called only once for 
every partition during a dataset
+ * initialization.
+ *
+ * @param <K> Type of a key in {@code upstream} data.
+ * @param <V> Type of a value in {@code upstream} data.
+ * @param <C> Type of a partition {@code context}.
+ *
+ * @see EmptyContextBuilder
+ */
+@FunctionalInterface
+public interface PartitionContextBuilder<K, V, C extends Serializable> extends 
Serializable {
+    /**
+     * Builds a new partition {@code context} from an {@code upstream} data.
+     *
+     * @param upstreamData Partition {@code upstream} data.
+     * @param upstreamDataSize Partition {@code upstream} data size.
+     * @return Partition {@code context}.
+     */
+    public C build(Iterator<UpstreamEntry<K, V>> upstreamData, long 
upstreamDataSize);
+
+    /**
+     * Makes a composed partition {@code context} builder that first builds a 
{@code context} and then applies the
+     * specified function on the result.
+     *
+     * @param fun Function that applied after first partition {@code context} 
is built.
+     * @param <C2> New type of a partition {@code context}.
+     * @return Composed partition {@code context} builder.
+     */
+    default public <C2 extends Serializable> PartitionContextBuilder<K, V, C2> 
andThen(IgniteFunction<C, C2> fun) {
+        return (upstreamData, upstreamDataSize) -> 
fun.apply(build(upstreamData, upstreamDataSize));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionDataBuilder.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionDataBuilder.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionDataBuilder.java
new file mode 100644
index 0000000..361719f
--- /dev/null
+++ 
b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionDataBuilder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import 
org.apache.ignite.ml.dataset.primitive.builder.data.SimpleDatasetDataBuilder;
+import 
org.apache.ignite.ml.dataset.primitive.builder.data.SimpleLabeledDatasetDataBuilder;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+
+/**
+ * Builder that accepts a partition {@code upstream} data and partition {@code 
context} and makes partition
+ * {@code data}. This builder is used to build a partition {@code data} and 
assumed to be called in all cases when
+ * partition {@code data} not found on the node that performs computation (it 
might be the result of a previous node
+ * failure or rebalancing).
+ *
+ * @param <K> Type of a key in <tt>upstream</tt> data.
+ * @param <V> Type of a value in <tt>upstream</tt> data.
+ * @param <C> Type of a partition <tt>context</tt>.
+ * @param <D> Type of a partition <tt>data</tt>.
+ * @see SimpleDatasetDataBuilder
+ * @see SimpleLabeledDatasetDataBuilder
+ */
+@FunctionalInterface
+public interface PartitionDataBuilder<K, V, C extends Serializable, D extends 
AutoCloseable> extends Serializable {
+    /**
+     * Builds a new partition {@code data} from a partition {@code upstream} 
data and partition {@code context}
+     *
+     * @param upstreamData Partition {@code upstream} data.
+     * @param upstreamDataSize Partition {@code upstream} data size.
+     * @param ctx Partition {@code context}.
+     * @return Partition {@code data}.
+     */
+    public D build(Iterator<UpstreamEntry<K, V>> upstreamData, long 
upstreamDataSize, C ctx);
+
+    /**
+     * Makes a composed partition {@code data} builder that first builds a 
{@code data} and then applies the specified
+     * function on the result.
+     *
+     * @param fun Function that applied after first partition {@code data} is 
built.
+     * @param <D2> New type of a partition {@code data}.
+     * @return Composed partition {@code data} builder.
+     */
+    default public <D2 extends AutoCloseable> PartitionDataBuilder<K, V, C, 
D2> andThen(
+        IgniteBiFunction<D, C, D2> fun) {
+        return (upstreamData, upstreamDataSize, ctx) -> 
fun.apply(build(upstreamData, upstreamDataSize, ctx), ctx);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/UpstreamEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/UpstreamEntry.java 
b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/UpstreamEntry.java
new file mode 100644
index 0000000..58226d9
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/UpstreamEntry.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset;
+
+/**
+ * Entry of the {@code upstream}.
+ *
+ * @param <K> Type of a key in {@code upstream} data.
+ * @param <V> Type of a value in {@code upstream} data.
+ */
+public class UpstreamEntry<K, V> {
+    /** Key. */
+    private final K key;
+
+    /** Value. */
+    private final V val;
+
+    /**
+     * Constructs a new instance of upstream entry.
+     *
+     * @param key Key.
+     * @param val Value.
+     */
+    public UpstreamEntry(K key, V val) {
+        this.key = key;
+        this.val = val;
+    }
+
+    /** */
+    public K getKey() {
+        return key;
+    }
+
+    /** */
+    public V getValue() {
+        return val;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java
new file mode 100644
index 0000000..463d496
--- /dev/null
+++ 
b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.impl.cache;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.ml.dataset.Dataset;
+import org.apache.ignite.ml.dataset.PartitionDataBuilder;
+import org.apache.ignite.ml.dataset.impl.cache.util.ComputeUtils;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import org.apache.ignite.ml.math.functions.IgniteBinaryOperator;
+import org.apache.ignite.ml.math.functions.IgniteFunction;
+import org.apache.ignite.ml.math.functions.IgniteTriFunction;
+
+/**
+ * An implementation of dataset based on Ignite Cache, which is used as {@code 
upstream} and as reliable storage for
+ * partition {@code context} as well.
+ *
+ * @param <K> Type of a key in {@code upstream} data.
+ * @param <V> Type of a value in {@code upstream} data.
+ * @param <C> Type of a partition {@code context}.
+ * @param <D> Type of a partition {@code data}.
+ */
+public class CacheBasedDataset<K, V, C extends Serializable, D extends 
AutoCloseable>
+    implements Dataset<C, D> {
+    /** Number of retries for the case when one of partitions not found on the 
node where computation is performed. */
+    private static final int RETRIES = 15 * 60;
+
+    /** Retry interval (ms) for the case when one of partitions not found on 
the node where computation is performed. */
+    private static final int RETRY_INTERVAL = 1000;
+
+    /** Ignite instance. */
+    private final Ignite ignite;
+
+    /** Ignite Cache with {@code upstream} data. */
+    private final IgniteCache<K, V> upstreamCache;
+
+    /** Ignite Cache with partition {@code context}. */
+    private final IgniteCache<Integer, C> datasetCache;
+
+    /** Partition {@code data} builder. */
+    private final PartitionDataBuilder<K, V, C, D> partDataBuilder;
+
+    /** Dataset ID that is used to identify dataset in local storage on the 
node where computation is performed. */
+    private final UUID datasetId;
+
+    /**
+     * Constructs a new instance of dataset based on Ignite Cache, which is 
used as {@code upstream} and as reliable storage for
+     * partition {@code context} as well.
+     *
+     * @param ignite Ignite instance.
+     * @param upstreamCache Ignite Cache with {@code upstream} data.
+     * @param datasetCache Ignite Cache with partition {@code context}.
+     * @param partDataBuilder Partition {@code data} builder.
+     * @param datasetId Dataset ID.
+     */
+    public CacheBasedDataset(Ignite ignite, IgniteCache<K, V> upstreamCache,
+        IgniteCache<Integer, C> datasetCache, PartitionDataBuilder<K, V, C, D> 
partDataBuilder,
+        UUID datasetId) {
+        this.ignite = ignite;
+        this.upstreamCache = upstreamCache;
+        this.datasetCache = datasetCache;
+        this.partDataBuilder = partDataBuilder;
+        this.datasetId = datasetId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> R computeWithCtx(IgniteTriFunction<C, D, Integer, R> 
map, IgniteBinaryOperator<R> reduce, R identity) {
+        String upstreamCacheName = upstreamCache.getName();
+        String datasetCacheName = datasetCache.getName();
+
+        return computeForAllPartitions(part -> {
+            C ctx = ComputeUtils.getContext(Ignition.localIgnite(), 
datasetCacheName, part);
+
+            D data = ComputeUtils.getData(
+                Ignition.localIgnite(),
+                upstreamCacheName,
+                datasetCacheName,
+                datasetId,
+                part,
+                partDataBuilder
+            );
+
+            R res = map.apply(ctx, data, part);
+
+            // Saves partition context after update.
+            ComputeUtils.saveContext(Ignition.localIgnite(), datasetCacheName, 
part, ctx);
+
+            return res;
+        }, reduce, identity);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> R compute(IgniteBiFunction<D, Integer, R> map, 
IgniteBinaryOperator<R> reduce, R identity) {
+        String upstreamCacheName = upstreamCache.getName();
+        String datasetCacheName = datasetCache.getName();
+
+        return computeForAllPartitions(part -> {
+            D data = ComputeUtils.getData(
+                Ignition.localIgnite(),
+                upstreamCacheName,
+                datasetCacheName,
+                datasetId,
+                part,
+                partDataBuilder
+            );
+
+            return map.apply(data, part);
+        }, reduce, identity);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        datasetCache.destroy();
+    }
+
+    /**
+     * Calls the {@code MapReduce} job specified as the {@code fun} function 
and the {@code reduce} reducer on all
+     * partitions with guarantee that partitions with the same index of 
upstream and partition {@code context} caches
+     * will be on the same node during the computation and will not be moved 
before computation is finished.
+     *
+     * @param fun Function that applies to all partitions.
+     * @param reduce Function that reduces results of {@code fun}.
+     * @param identity Identity.
+     * @param <R> Type of a result.
+     * @return Final result.
+     */
+    private <R> R computeForAllPartitions(IgniteFunction<Integer, R> fun, 
IgniteBinaryOperator<R> reduce, R identity) {
+        Collection<String> cacheNames = Arrays.asList(datasetCache.getName(), 
upstreamCache.getName());
+        Collection<R> results = ComputeUtils.affinityCallWithRetries(ignite, 
cacheNames, fun, RETRIES, RETRY_INTERVAL);
+
+        R res = identity;
+        for (R partRes : results)
+            res = reduce.apply(res, partRes);
+
+        return res;
+    }
+
+    /** */
+    public IgniteCache<K, V> getUpstreamCache() {
+        return upstreamCache;
+    }
+
+    /** */
+    public IgniteCache<Integer, C> getDatasetCache() {
+        return datasetCache;
+    }
+}

Reply via email to