IGNITE-7437: Partition based dataset implementation this closes #3410
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/54bac750 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/54bac750 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/54bac750 Branch: refs/heads/master Commit: 54bac750098bba32993754935e867bfbdd42ebef Parents: eee4f17 Author: dmitrievanthony <[email protected]> Authored: Sun Feb 4 21:06:02 2018 +0300 Committer: Yury Babak <[email protected]> Committed: Sun Feb 4 21:06:02 2018 +0300 ---------------------------------------------------------------------- .../AlgorithmSpecificDatasetExample.java | 195 ++++++++++ .../ml/dataset/CacheBasedDatasetExample.java | 90 +++++ .../ml/dataset/LocalDatasetExample.java | 84 ++++ .../examples/ml/dataset/model/Person.java | 58 +++ .../examples/ml/dataset/model/package-info.java | 22 ++ .../examples/ml/dataset/package-info.java | 22 ++ .../ml/preprocessing/NormalizationExample.java | 109 ++++++ .../examples/ml/preprocessing/package-info.java | 22 ++ modules/ml/pom.xml | 7 + .../org/apache/ignite/ml/dataset/Dataset.java | 213 ++++++++++ .../ignite/ml/dataset/DatasetBuilder.java | 49 +++ .../ignite/ml/dataset/DatasetFactory.java | 387 +++++++++++++++++++ .../ml/dataset/PartitionContextBuilder.java | 58 +++ .../ignite/ml/dataset/PartitionDataBuilder.java | 63 +++ .../apache/ignite/ml/dataset/UpstreamEntry.java | 53 +++ .../dataset/impl/cache/CacheBasedDataset.java | 168 ++++++++ .../impl/cache/CacheBasedDatasetBuilder.java | 95 +++++ .../ml/dataset/impl/cache/package-info.java | 22 ++ .../dataset/impl/cache/util/ComputeUtils.java | 251 ++++++++++++ .../util/DatasetAffinityFunctionWrapper.java | 75 ++++ .../impl/cache/util/PartitionDataStorage.java | 65 ++++ .../impl/cache/util/UpstreamCursorAdapter.java | 68 ++++ .../dataset/impl/cache/util/package-info.java | 22 ++ .../ml/dataset/impl/local/LocalDataset.java | 88 +++++ .../dataset/impl/local/LocalDatasetBuilder.java | 137 +++++++ .../ml/dataset/impl/local/package-info.java | 22 ++ .../ignite/ml/dataset/impl/package-info.java | 22 ++ .../apache/ignite/ml/dataset/package-info.java | 22 ++ .../ml/dataset/primitive/DatasetWrapper.java | 63 +++ .../ml/dataset/primitive/SimpleDataset.java | 216 +++++++++++ .../dataset/primitive/SimpleLabeledDataset.java | 39 ++ .../builder/context/EmptyContextBuilder.java | 39 ++ .../primitive/builder/context/package-info.java | 22 ++ .../builder/data/SimpleDatasetDataBuilder.java | 76 ++++ .../data/SimpleLabeledDatasetDataBuilder.java | 86 +++++ .../primitive/builder/data/package-info.java | 22 ++ .../dataset/primitive/builder/package-info.java | 22 ++ .../dataset/primitive/context/EmptyContext.java | 28 ++ .../dataset/primitive/context/package-info.java | 22 ++ .../primitive/data/SimpleDatasetData.java | 69 ++++ .../data/SimpleLabeledDatasetData.java | 79 ++++ .../ml/dataset/primitive/data/package-info.java | 22 ++ .../ml/dataset/primitive/package-info.java | 28 ++ .../ml/preprocessing/PreprocessingTrainer.java | 41 ++ .../NormalizationPartitionData.java | 58 +++ .../NormalizationPreprocessor.java | 88 +++++ .../normalization/NormalizationTrainer.java | 90 +++++ .../normalization/package-info.java | 22 ++ .../ignite/ml/preprocessing/package-info.java | 22 ++ .../org/apache/ignite/ml/IgniteMLTestSuite.java | 6 +- .../ignite/ml/dataset/DatasetTestSuite.java | 45 +++ .../cache/CacheBasedDatasetBuilderTest.java | 107 +++++ .../impl/cache/CacheBasedDatasetTest.java | 353 +++++++++++++++++ .../impl/cache/util/ComputeUtilsTest.java | 309 +++++++++++++++ .../DatasetAffinityFunctionWrapperTest.java | 110 ++++++ .../cache/util/PartitionDataStorageTest.java | 49 +++ .../impl/local/LocalDatasetBuilderTest.java | 91 +++++ .../dataset/primitive/DatasetWrapperTest.java | 87 +++++ .../preprocessing/PreprocessingTestSuite.java | 35 ++ .../NormalizationPreprocessorTest.java | 54 +++ .../normalization/NormalizationTrainerTest.java | 76 ++++ 61 files changed, 4964 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/examples/src/main/java/org/apache/ignite/examples/ml/dataset/AlgorithmSpecificDatasetExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/dataset/AlgorithmSpecificDatasetExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/AlgorithmSpecificDatasetExample.java new file mode 100644 index 0000000..98f85cd --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/AlgorithmSpecificDatasetExample.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.ml.dataset; + +import com.github.fommil.netlib.BLAS; +import java.io.Serializable; +import java.util.Arrays; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.examples.ml.dataset.model.Person; +import org.apache.ignite.ml.dataset.Dataset; +import org.apache.ignite.ml.dataset.DatasetFactory; +import org.apache.ignite.ml.dataset.primitive.DatasetWrapper; +import org.apache.ignite.ml.dataset.primitive.builder.data.SimpleLabeledDatasetDataBuilder; +import org.apache.ignite.ml.dataset.primitive.data.SimpleLabeledDatasetData; + +/** + * Example that shows how to implement your own algorithm (gradient descent trainer for linear regression) which uses + * dataset as an underlying infrastructure. + * + * The common idea behind using algorithm specific datasets is to write a simple local version algorithm at first, then + * find operations which involves data manipulations, and finally define algorithm specific version of the dataset + * extended by introducing these new operations. As result your algorithm will work with extended dataset (based on + * {@link DatasetWrapper}) in a sequential manner. + * + * In this example we need to implement gradient descent. This is iterative method that involves calculation of gradient + * on every step. In according with the common idea we defines {@link AlgorithmSpecificDataset} - extended version + * of {@code Dataset} with {@code gradient} method. As result our gradient descent method looks like a simple loop where + * every iteration includes call of the {@code gradient} method. In the example we want to keep iteration number as well + * for logging. Iteration number cannot be recovered from the {@code upstream} data and we need to keep it in the custom + * partition {@code context} which is represented by {@link AlgorithmSpecificPartitionContext} class. + */ +public class AlgorithmSpecificDatasetExample { + /** Run example. */ + public static void main(String[] args) throws Exception { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(">>> Algorithm Specific Dataset example started."); + + IgniteCache<Integer, Person> persons = createCache(ignite); + + // Creates a algorithm specific dataset to perform linear regression. Here we defines the way features and + // labels are extracted, and partition data and context are created. + try (AlgorithmSpecificDataset dataset = DatasetFactory.create( + ignite, + persons, + (upstream, upstreamSize) -> new AlgorithmSpecificPartitionContext(), + new SimpleLabeledDatasetDataBuilder<Integer, Person, AlgorithmSpecificPartitionContext>( + (k, v) -> new double[] {v.getAge()}, + (k, v) -> v.getSalary(), + 1 + ).andThen((data, ctx) -> { + double[] features = data.getFeatures(); + int rows = data.getRows(); + + // Makes a copy of features to supplement it by columns with values equal to 1.0. + double[] a = new double[features.length + rows]; + + for (int i = 0; i < rows; i++) + a[i] = 1.0; + + System.arraycopy(features, 0, a, rows, features.length); + + return new SimpleLabeledDatasetData(a, rows, data.getCols() + 1, data.getLabels()); + }) + ).wrap(AlgorithmSpecificDataset::new)) { + // Trains linear regression model using gradient descent. + double[] linearRegressionMdl = new double[2]; + + for (int i = 0; i < 1000; i++) { + double[] gradient = dataset.gradient(linearRegressionMdl); + + if (BLAS.getInstance().dnrm2(gradient.length, gradient, 1) < 1e-4) + break; + + for (int j = 0; j < gradient.length; j++) + linearRegressionMdl[j] -= 0.1 / persons.size() * gradient[j]; + } + + System.out.println("Linear Regression Model: " + Arrays.toString(linearRegressionMdl)); + } + + System.out.println(">>> Algorithm Specific Dataset example completed."); + } + } + + /** + * Algorithm specific dataset. Extended version of {@code Dataset} with {@code gradient} method. + */ + private static class AlgorithmSpecificDataset + extends DatasetWrapper<AlgorithmSpecificPartitionContext, SimpleLabeledDatasetData> { + /** BLAS (Basic Linear Algebra Subprograms) instance. */ + private static final BLAS blas = BLAS.getInstance(); + + /** + * Constructs a new instance of dataset wrapper that delegates {@code compute} actions to the actual delegate. + * + * @param delegate Delegate that performs {@code compute} actions. + */ + AlgorithmSpecificDataset( + Dataset<AlgorithmSpecificPartitionContext, SimpleLabeledDatasetData> delegate) { + super(delegate); + } + + /** Calculate gradient. */ + double[] gradient(double[] x) { + return computeWithCtx((ctx, data, partIdx) -> { + double[] tmp = Arrays.copyOf(data.getLabels(), data.getRows()); + blas.dgemv("N", data.getRows(), data.getCols(), 1.0, data.getFeatures(), + Math.max(1, data.getRows()), x, 1, -1.0, tmp, 1); + + double[] res = new double[data.getCols()]; + blas.dgemv("T", data.getRows(), data.getCols(), 1.0, data.getFeatures(), + Math.max(1, data.getRows()), tmp, 1, 0.0, res, 1); + + int iteration = ctx.getIteration(); + + System.out.println("Iteration " + iteration + " on partition " + partIdx + + " completed with local result " + Arrays.toString(res)); + + ctx.setIteration(iteration + 1); + + return res; + }, this::sum); + } + + /** Sum of two vectors. */ + public double[] sum(double[] a, double[] b) { + if (a == null) + return b; + + if (b == null) + return a; + + blas.daxpy(a.length, 1.0, a, 1, b, 1); + + return b; + } + } + + /** + * Algorithm specific partition context which keeps iteration number. + */ + private static class AlgorithmSpecificPartitionContext implements Serializable { + /** */ + private static final long serialVersionUID = 1887368924266684044L; + + /** Iteration number. */ + private int iteration; + + /** */ + public int getIteration() { + return iteration; + } + + /** */ + public void setIteration(int iteration) { + this.iteration = iteration; + } + } + + /** */ + private static IgniteCache<Integer, Person> createCache(Ignite ignite) { + CacheConfiguration<Integer, Person> cacheConfiguration = new CacheConfiguration<>(); + + cacheConfiguration.setName("PERSONS"); + cacheConfiguration.setAffinity(new RendezvousAffinityFunction(false, 2)); + + IgniteCache<Integer, Person> persons = ignite.createCache(cacheConfiguration); + + persons.put(1, new Person("Mike", 1, 1)); + persons.put(2, new Person("John", 2, 2)); + persons.put(3, new Person("George", 3, 3)); + persons.put(4, new Person("Karl", 4, 4)); + + return persons; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/examples/src/main/java/org/apache/ignite/examples/ml/dataset/CacheBasedDatasetExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/dataset/CacheBasedDatasetExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/CacheBasedDatasetExample.java new file mode 100644 index 0000000..b1413ad --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/CacheBasedDatasetExample.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.ml.dataset; + +import java.util.Arrays; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.examples.ml.dataset.model.Person; +import org.apache.ignite.ml.dataset.DatasetFactory; +import org.apache.ignite.ml.dataset.primitive.SimpleDataset; + +/** + * Example that shows how to create dataset based on an existing Ignite Cache and then use it to calculate {@code mean} + * and {@code std} values as well as {@code covariance} and {@code correlation} matrices. + */ +public class CacheBasedDatasetExample { + /** Run example. */ + public static void main(String[] args) throws Exception { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(">>> Cache Based Dataset example started."); + + IgniteCache<Integer, Person> persons = createCache(ignite); + + // Creates a cache based simple dataset containing features and providing standard dataset API. + try (SimpleDataset<?> dataset = DatasetFactory.createSimpleDataset( + ignite, + persons, + (k, v) -> new double[]{ v.getAge(), v.getSalary() }, + 2 + )) { + // Calculation of the mean value. This calculation will be performed in map-reduce manner. + double[] mean = dataset.mean(); + System.out.println("Mean \n\t" + Arrays.toString(mean)); + + // Calculation of the standard deviation. This calculation will be performed in map-reduce manner. + double[] std = dataset.std(); + System.out.println("Standard deviation \n\t" + Arrays.toString(std)); + + // Calculation of the covariance matrix. This calculation will be performed in map-reduce manner. + double[][] cov = dataset.cov(); + System.out.println("Covariance matrix "); + for (double[] row : cov) + System.out.println("\t" + Arrays.toString(row)); + + // Calculation of the correlation matrix. This calculation will be performed in map-reduce manner. + double[][] corr = dataset.corr(); + System.out.println("Correlation matrix "); + for (double[] row : corr) + System.out.println("\t" + Arrays.toString(row)); + } + + System.out.println(">>> Cache Based Dataset example completed."); + } + } + + /** */ + private static IgniteCache<Integer, Person> createCache(Ignite ignite) { + CacheConfiguration<Integer, Person> cacheConfiguration = new CacheConfiguration<>(); + + cacheConfiguration.setName("PERSONS"); + cacheConfiguration.setAffinity(new RendezvousAffinityFunction(false, 2)); + + IgniteCache<Integer, Person> persons = ignite.createCache(cacheConfiguration); + + persons.put(1, new Person("Mike", 42, 10000)); + persons.put(2, new Person("John", 32, 64000)); + persons.put(3, new Person("George", 53, 120000)); + persons.put(4, new Person("Karl", 24, 70000)); + + return persons; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/examples/src/main/java/org/apache/ignite/examples/ml/dataset/LocalDatasetExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/dataset/LocalDatasetExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/LocalDatasetExample.java new file mode 100644 index 0000000..af14836 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/LocalDatasetExample.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.ml.dataset; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.ignite.examples.ml.dataset.model.Person; +import org.apache.ignite.ml.dataset.DatasetFactory; +import org.apache.ignite.ml.dataset.primitive.SimpleDataset; + +/** + * Example that shows how to create dataset based on an existing local storage and then use it to calculate {@code mean} + * and {@code std} values as well as {@code covariance} and {@code correlation} matrices. + */ +public class LocalDatasetExample { + /** Run example. */ + public static void main(String[] args) throws Exception { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(">>> Local Dataset example started."); + + Map<Integer, Person> persons = createCache(ignite); + + // Creates a local simple dataset containing features and providing standard dataset API. + try (SimpleDataset<?> dataset = DatasetFactory.createSimpleDataset( + persons, + 2, + (k, v) -> new double[]{ v.getAge(), v.getSalary() }, + 2 + )) { + // Calculation of the mean value. This calculation will be performed in map-reduce manner. + double[] mean = dataset.mean(); + System.out.println("Mean \n\t" + Arrays.toString(mean)); + + // Calculation of the standard deviation. This calculation will be performed in map-reduce manner. + double[] std = dataset.std(); + System.out.println("Standard deviation \n\t" + Arrays.toString(std)); + + // Calculation of the covariance matrix. This calculation will be performed in map-reduce manner. + double[][] cov = dataset.cov(); + System.out.println("Covariance matrix "); + for (double[] row : cov) + System.out.println("\t" + Arrays.toString(row)); + + // Calculation of the correlation matrix. This calculation will be performed in map-reduce manner. + double[][] corr = dataset.corr(); + System.out.println("Correlation matrix "); + for (double[] row : corr) + System.out.println("\t" + Arrays.toString(row)); + } + + System.out.println(">>> Local Dataset example completed."); + } + } + + /** */ + private static Map<Integer, Person> createCache(Ignite ignite) { + Map<Integer, Person> persons = new HashMap<>(); + + persons.put(1, new Person("Mike", 42, 10000)); + persons.put(2, new Person("John", 32, 64000)); + persons.put(3, new Person("George", 53, 120000)); + persons.put(4, new Person("Karl", 24, 70000)); + + return persons; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/Person.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/Person.java b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/Person.java new file mode 100644 index 0000000..3770de8 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/Person.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.ml.dataset.model; + +/** Person model. */ +public class Person { + /** Name. */ + private final String name; + + /** Age. */ + private final double age; + + /** Salary. */ + private final double salary; + + /** + * Constructs a new instance of person. + * + * @param name Name. + * @param age Age. + * @param salary Salary. + */ + public Person(String name, double age, double salary) { + this.name = name; + this.age = age; + this.salary = salary; + } + + /** */ + public String getName() { + return name; + } + + /** */ + public double getAge() { + return age; + } + + /** */ + public double getSalary() { + return salary; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/package-info.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/package-info.java b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/package-info.java new file mode 100644 index 0000000..86df332 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/model/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Models used in machine learning dataset examples. + */ +package org.apache.ignite.examples.ml.dataset.model; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/examples/src/main/java/org/apache/ignite/examples/ml/dataset/package-info.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/dataset/package-info.java b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/package-info.java new file mode 100644 index 0000000..2d0fc1d --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/ml/dataset/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Machine learning dataset examples. + */ +package org.apache.ignite.examples.ml.dataset; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/NormalizationExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/NormalizationExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/NormalizationExample.java new file mode 100644 index 0000000..008b4ca --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/NormalizationExample.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.ml.preprocessing; + +import java.util.Arrays; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.examples.ml.dataset.model.Person; +import org.apache.ignite.ml.dataset.DatasetBuilder; +import org.apache.ignite.ml.dataset.DatasetFactory; +import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDatasetBuilder; +import org.apache.ignite.ml.dataset.primitive.SimpleDataset; +import org.apache.ignite.ml.math.functions.IgniteBiFunction; +import org.apache.ignite.ml.preprocessing.normalization.NormalizationPreprocessor; +import org.apache.ignite.ml.preprocessing.normalization.NormalizationTrainer; + +/** + * Example that shows how to use normalization preprocessor to normalize data. + * + * Machine learning preprocessors are built as a chain. Most often a first preprocessor is a feature extractor as shown + * in this example. The second preprocessor here is a normalization preprocessor which is built on top of the feature + * extractor and represents a chain of itself and the underlying feature extractor. + */ +public class NormalizationExample { + /** Run example. */ + public static void main(String[] args) throws Exception { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(">>> Normalization example started."); + + IgniteCache<Integer, Person> persons = createCache(ignite); + + DatasetBuilder<Integer, Person> builder = new CacheBasedDatasetBuilder<>(ignite, persons); + + // Defines first preprocessor that extracts features from an upstream data. + IgniteBiFunction<Integer, Person, double[]> featureExtractor = (k, v) -> new double[] { + v.getAge(), + v.getSalary() + }; + + // Defines second preprocessor that normalizes features. + NormalizationPreprocessor<Integer, Person> preprocessor = new NormalizationTrainer<Integer, Person>() + .fit(builder, featureExtractor, 2); + + // Creates a cache based simple dataset containing features and providing standard dataset API. + try (SimpleDataset<?> dataset = DatasetFactory.createSimpleDataset( + builder, + preprocessor, + 2 + )) { + // Calculation of the mean value. This calculation will be performed in map-reduce manner. + double[] mean = dataset.mean(); + System.out.println("Mean \n\t" + Arrays.toString(mean)); + + // Calculation of the standard deviation. This calculation will be performed in map-reduce manner. + double[] std = dataset.std(); + System.out.println("Standard deviation \n\t" + Arrays.toString(std)); + + // Calculation of the covariance matrix. This calculation will be performed in map-reduce manner. + double[][] cov = dataset.cov(); + System.out.println("Covariance matrix "); + for (double[] row : cov) + System.out.println("\t" + Arrays.toString(row)); + + // Calculation of the correlation matrix. This calculation will be performed in map-reduce manner. + double[][] corr = dataset.corr(); + System.out.println("Correlation matrix "); + for (double[] row : corr) + System.out.println("\t" + Arrays.toString(row)); + } + + System.out.println(">>> Normalization example completed."); + } + } + + /** */ + private static IgniteCache<Integer, Person> createCache(Ignite ignite) { + CacheConfiguration<Integer, Person> cacheConfiguration = new CacheConfiguration<>(); + + cacheConfiguration.setName("PERSONS"); + cacheConfiguration.setAffinity(new RendezvousAffinityFunction(false, 2)); + + IgniteCache<Integer, Person> persons = ignite.createCache(cacheConfiguration); + + persons.put(1, new Person("Mike", 42, 10000)); + persons.put(2, new Person("John", 32, 64000)); + persons.put(3, new Person("George", 53, 120000)); + persons.put(4, new Person("Karl", 24, 70000)); + + return persons; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/package-info.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/package-info.java b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/package-info.java new file mode 100644 index 0000000..164f14d --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Machine learning preprocessing examples. + */ +package org.apache.ignite.examples.ml.preprocessing; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/pom.xml ---------------------------------------------------------------------- diff --git a/modules/ml/pom.xml b/modules/ml/pom.xml index 2e64582..f8d56d7 100644 --- a/modules/ml/pom.xml +++ b/modules/ml/pom.xml @@ -98,6 +98,13 @@ <artifactId>SparseBitSet</artifactId> <version>1.0</version> </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/Dataset.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/Dataset.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/Dataset.java new file mode 100644 index 0000000..24a2063 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/Dataset.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset; + +import java.io.Serializable; +import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDataset; +import org.apache.ignite.ml.dataset.impl.local.LocalDataset; +import org.apache.ignite.ml.math.functions.IgniteBiConsumer; +import org.apache.ignite.ml.math.functions.IgniteBiFunction; +import org.apache.ignite.ml.math.functions.IgniteBinaryOperator; +import org.apache.ignite.ml.math.functions.IgniteConsumer; +import org.apache.ignite.ml.math.functions.IgniteFunction; +import org.apache.ignite.ml.math.functions.IgniteTriConsumer; +import org.apache.ignite.ml.math.functions.IgniteTriFunction; + +/** + * A dataset providing an API that allows to perform generic computations on a distributed data represented as a set of + * partitions distributed across a cluster or placed locally. Every partition contains a {@code context} (reliably + * stored segment) and {@code data} (unreliably stored segment, which can be recovered from an upstream data and a + * {@code context} if needed). Computations are performed in a {@code MapReduce} manner, what allows to reduce a + * network traffic for most of the machine learning algorithms. + * + * <p>Dataset functionality allows to implement iterative machine learning algorithms via introducing computation + * context. In case iterative algorithm requires to maintain a state available and updatable on every iteration this + * state can be stored in the {@code context} of the partition and after that it will be available in further + * computations even if the Ignite Cache partition will be moved to another node because of node failure or rebalancing. + * + * <p>Partition {@code context} should be {@link Serializable} to be saved in Ignite Cache. Partition {@code data} + * should be {@link AutoCloseable} to allow system to clean up correspondent resources when partition {@code data} is + * not needed anymore. + * + * @param <C> Type of a partition {@code context}. + * @param <D> Type of a partition {@code data}. + * + * @see CacheBasedDataset + * @see LocalDataset + * @see DatasetFactory + */ +public interface Dataset<C extends Serializable, D extends AutoCloseable> extends AutoCloseable { + /** + * Applies the specified {@code map} function to every partition {@code data}, {@code context} and partition + * index in the dataset and then reduces {@code map} results to final result by using the {@code reduce} function. + * + * @param map Function applied to every partition {@code data}, {@code context} and partition index. + * @param reduce Function applied to results of {@code map} to get final result. + * @param identity Identity. + * @param <R> Type of a result. + * @return Final result. + */ + public <R> R computeWithCtx(IgniteTriFunction<C, D, Integer, R> map, IgniteBinaryOperator<R> reduce, R identity); + + /** + * Applies the specified {@code map} function to every partition {@code data} and partition index in the dataset + * and then reduces {@code map} results to final result by using the {@code reduce} function. + * + * @param map Function applied to every partition {@code data} and partition index. + * @param reduce Function applied to results of {@code map} to get final result. + * @param identity Identity. + * @param <R> Type of a result. + * @return Final result. + */ + public <R> R compute(IgniteBiFunction<D, Integer, R> map, IgniteBinaryOperator<R> reduce, R identity); + + /** + * Applies the specified {@code map} function to every partition {@code data}, {@code context} and partition + * index in the dataset and then reduces {@code map} results to final result by using the {@code reduce} function. + * + * @param map Function applied to every partition {@code data}, {@code context} and partition index. + * @param reduce Function applied to results of {@code map} to get final result. + * @param <R> Type of a result. + * @return Final result. + */ + default public <R> R computeWithCtx(IgniteTriFunction<C, D, Integer, R> map, IgniteBinaryOperator<R> reduce) { + return computeWithCtx(map, reduce, null); + } + + /** + * Applies the specified {@code map} function to every partition {@code data} and partition index in the dataset + * and then reduces {@code map} results to final result by using the {@code reduce} function. + * + * @param map Function applied to every partition {@code data} and partition index. + * @param reduce Function applied to results of {@code map} to get final result. + * @param <R> Type of a result. + * @return Final result. + */ + default public <R> R compute(IgniteBiFunction<D, Integer, R> map, IgniteBinaryOperator<R> reduce) { + return compute(map, reduce, null); + } + + /** + * Applies the specified {@code map} function to every partition {@code data} and {@code context} in the dataset + * and then reduces {@code map} results to final result by using the {@code reduce} function. + * + * @param map Function applied to every partition {@code data} and {@code context}. + * @param reduce Function applied to results of {@code map} to get final result. + * @param identity Identity. + * @param <R> Type of a result. + * @return Final result. + */ + default public <R> R computeWithCtx(IgniteBiFunction<C, D, R> map, IgniteBinaryOperator<R> reduce, R identity) { + return computeWithCtx((ctx, data, partIdx) -> map.apply(ctx, data), reduce, identity); + } + + /** + * Applies the specified {@code map} function to every partition {@code data} in the dataset and then reduces + * {@code map} results to final result by using the {@code reduce} function. + * + * @param map Function applied to every partition {@code data}. + * @param reduce Function applied to results of {@code map} to get final result. + * @param identity Identity. + * @param <R> Type of a result. + * @return Final result. + */ + default public <R> R compute(IgniteFunction<D, R> map, IgniteBinaryOperator<R> reduce, R identity) { + return compute((data, partIdx) -> map.apply(data), reduce, identity); + } + + /** + * Applies the specified {@code map} function to every partition {@code data} and {@code context} in the dataset + * and then reduces {@code map} results to final result by using the {@code reduce} function. + * + * @param map Function applied to every partition {@code data} and {@code context}. + * @param reduce Function applied to results of {@code map} to get final result. + * @param <R> Type of a result. + * @return Final result. + */ + default public <R> R computeWithCtx(IgniteBiFunction<C, D, R> map, IgniteBinaryOperator<R> reduce) { + return computeWithCtx((ctx, data, partIdx) -> map.apply(ctx, data), reduce); + } + + /** + * Applies the specified {@code map} function to every partition {@code data} in the dataset and then reduces + * {@code map} results to final result by using the {@code reduce} function. + * + * @param map Function applied to every partition {@code data}. + * @param reduce Function applied to results of {@code map} to get final result. + * @param <R> Type of a result. + * @return Final result. + */ + default public <R> R compute(IgniteFunction<D, R> map, IgniteBinaryOperator<R> reduce) { + return compute((data, partIdx) -> map.apply(data), reduce); + } + + /** + * Applies the specified {@code map} function to every partition {@code data}, {@code context} and partition + * index in the dataset. + * + * @param map Function applied to every partition {@code data}, {@code context} and partition index. + */ + default public void computeWithCtx(IgniteTriConsumer<C, D, Integer> map) { + computeWithCtx((ctx, data, partIdx) -> { + map.accept(ctx, data, partIdx); + return null; + }, (a, b) -> null); + } + + /** + * Applies the specified {@code map} function to every partition {@code data} in the dataset and partition index. + * + * @param map Function applied to every partition {@code data} and partition index. + */ + default public void compute(IgniteBiConsumer<D, Integer> map) { + compute((data, partIdx) -> { + map.accept(data, partIdx); + return null; + }, (a, b) -> null); + } + + /** + * Applies the specified {@code map} function to every partition {@code data} and {@code context} in the dataset. + * + * @param map Function applied to every partition {@code data} and {@code context}. + */ + default public void computeWithCtx(IgniteBiConsumer<C, D> map) { + computeWithCtx((ctx, data, partIdx) -> map.accept(ctx, data)); + } + + /** + * Applies the specified {@code map} function to every partition {@code data} in the dataset. + * + * @param map Function applied to every partition {@code data}. + */ + default public void compute(IgniteConsumer<D> map) { + compute((data, partIdx) -> map.accept(data)); + } + + /** + * Wraps this dataset into the specified wrapper to introduce new functionality based on {@code compute} and + * {@code computeWithCtx} methods. + * + * @param wrapper Dataset wrapper. + * @param <I> Type of a new wrapped dataset. + * @return New wrapped dataset. + */ + default public <I extends Dataset<C ,D>> I wrap(IgniteFunction<Dataset<C, D>, I> wrapper) { + return wrapper.apply(this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetBuilder.java new file mode 100644 index 0000000..a6757ff --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetBuilder.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset; + +import java.io.Serializable; +import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDatasetBuilder; +import org.apache.ignite.ml.dataset.impl.local.LocalDatasetBuilder; + +/** + * A builder constructing instances of a {@link Dataset}. Implementations of this interface encapsulate logic of + * building specific datasets such as allocation required data structures and initialization of {@code context} part of + * partitions. + * + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * + * @see CacheBasedDatasetBuilder + * @see LocalDatasetBuilder + * @see Dataset + */ +public interface DatasetBuilder<K, V> { + /** + * Constructs a new instance of {@link Dataset} that includes allocation required data structures and + * initialization of {@code context} part of partitions. + * + * @param partCtxBuilder Partition {@code context} builder. + * @param partDataBuilder Partition {@code data} builder. + * @param <C> Type of a partition {@code context}. + * @param <D> Type of a partition {@code data}. + * @return Dataset. + */ + public <C extends Serializable, D extends AutoCloseable> Dataset<C, D> build( + PartitionContextBuilder<K, V, C> partCtxBuilder, PartitionDataBuilder<K, V, C, D> partDataBuilder); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetFactory.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetFactory.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetFactory.java new file mode 100644 index 0000000..af44a8a --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/DatasetFactory.java @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset; + +import java.io.Serializable; +import java.util.Map; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDatasetBuilder; +import org.apache.ignite.ml.dataset.impl.local.LocalDatasetBuilder; +import org.apache.ignite.ml.dataset.primitive.SimpleDataset; +import org.apache.ignite.ml.dataset.primitive.SimpleLabeledDataset; +import org.apache.ignite.ml.dataset.primitive.builder.context.EmptyContextBuilder; +import org.apache.ignite.ml.dataset.primitive.builder.data.SimpleDatasetDataBuilder; +import org.apache.ignite.ml.dataset.primitive.builder.data.SimpleLabeledDatasetDataBuilder; +import org.apache.ignite.ml.dataset.primitive.context.EmptyContext; +import org.apache.ignite.ml.dataset.primitive.data.SimpleDatasetData; +import org.apache.ignite.ml.dataset.primitive.data.SimpleLabeledDatasetData; +import org.apache.ignite.ml.math.functions.IgniteBiFunction; + +/** + * Factory providing a client facing API that allows to construct basic and the most frequently used types of dataset. + * + * + * <p>Dataset construction is based on three major concepts: a partition {@code upstream}, {@code context} and + * {@code data}. A partition {@code upstream} is a data source, which assumed to be available all the time regardless + * node failures and rebalancing events. A partition {@code context} is a part of a partition maintained during the + * whole computation process and stored in a reliable storage so that a {@code context} is staying available and + * consistent regardless node failures and rebalancing events as well as an {@code upstream}. A partition {@code data} + * is a part of partition maintained during a computation process in unreliable local storage such as heap, off-heap or + * GPU memory on the node where current computation is performed, so that partition {@code data} can be lost as result + * of node failure or rebalancing, but it can be restored from an {@code upstream} and a partition {@code context}. + * + * <p>A partition {@code context} and {@code data} are built on top of an {@code upstream} by using specified + * builders: {@link PartitionContextBuilder} and {@link PartitionDataBuilder} correspondingly. To build a generic + * dataset the following approach is used: + * + * <code> + * {@code + * Dataset<C, D> dataset = DatasetFactory.create( + * ignite, + * cache, + * partitionContextBuilder, + * partitionDataBuilder + * ); + * } + * </code> + * + * <p>As well as the generic building method {@code create} this factory provides methods that allow to create a + * specific dataset types such as method {@code createSimpleDataset} to create {@link SimpleDataset} and method + * {@code createSimpleLabeledDataset} to create {@link SimpleLabeledDataset}. + * + * @see Dataset + * @see PartitionContextBuilder + * @see PartitionDataBuilder + */ +public class DatasetFactory { + /** + * Creates a new instance of distributed dataset using the specified {@code partCtxBuilder} and + * {@code partDataBuilder}. This is the generic methods that allows to create any Ignite Cache based datasets with + * any desired partition {@code context} and {@code data}. + * + * @param datasetBuilder Dataset builder. + * @param partCtxBuilder Partition {@code context} builder. + * @param partDataBuilder Partition {@code data} builder. + * @param <K> Type of a key in {@code upstream} data. + * @param <V> ype of a value in {@code upstream} data. + * @param <C> Type of a partition {@code context}. + * @param <D> Type of a partition {@code data}. + * @return Dataset. + */ + public static <K, V, C extends Serializable, D extends AutoCloseable> Dataset<C, D> create( + DatasetBuilder<K, V> datasetBuilder, PartitionContextBuilder<K, V, C> partCtxBuilder, + PartitionDataBuilder<K, V, C, D> partDataBuilder) { + return datasetBuilder.build(partCtxBuilder, partDataBuilder); + } + /** + * Creates a new instance of distributed dataset using the specified {@code partCtxBuilder} and + * {@code partDataBuilder}. This is the generic methods that allows to create any Ignite Cache based datasets with + * any desired partition {@code context} and {@code data}. + * + * @param ignite Ignite instance. + * @param upstreamCache Ignite Cache with {@code upstream} data. + * @param partCtxBuilder Partition {@code context} builder. + * @param partDataBuilder Partition {@code data} builder. + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @param <C> Type of a partition {@code context}. + * @param <D> Type of a partition {@code data}. + * @return Dataset. + */ + public static <K, V, C extends Serializable, D extends AutoCloseable> Dataset<C, D> create( + Ignite ignite, IgniteCache<K, V> upstreamCache, PartitionContextBuilder<K, V, C> partCtxBuilder, + PartitionDataBuilder<K, V, C, D> partDataBuilder) { + return create(new CacheBasedDatasetBuilder<>(ignite, upstreamCache), partCtxBuilder, partDataBuilder); + } + + /** + * Creates a new instance of distributed {@link SimpleDataset} using the specified {@code partCtxBuilder} and + * {@code featureExtractor}. This methods determines partition {@code data} to be {@link SimpleDatasetData}, but + * allows to use any desired type of partition {@code context}. + * + * @param datasetBuilder Dataset builder. + * @param partCtxBuilder Partition {@code context} builder. + * @param featureExtractor Feature extractor used to extract features and build {@link SimpleDatasetData}. + * @param cols Number of columns (features) will be extracted. + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @param <C> Type of a partition {@code context}. + * @return Dataset. + */ + public static <K, V, C extends Serializable> SimpleDataset<C> createSimpleDataset( + DatasetBuilder<K, V> datasetBuilder, PartitionContextBuilder<K, V, C> partCtxBuilder, + IgniteBiFunction<K, V, double[]> featureExtractor, int cols) { + return create( + datasetBuilder, + partCtxBuilder, + new SimpleDatasetDataBuilder<>(featureExtractor, cols) + ).wrap(SimpleDataset::new); + } + + /** + * Creates a new instance of distributed {@link SimpleDataset} using the specified {@code partCtxBuilder} and + * {@code featureExtractor}. This methods determines partition {@code data} to be {@link SimpleDatasetData}, but + * allows to use any desired type of partition {@code context}. + * + * @param ignite Ignite instance. + * @param upstreamCache Ignite Cache with {@code upstream} data. + * @param partCtxBuilder Partition {@code context} builder. + * @param featureExtractor Feature extractor used to extract features and build {@link SimpleDatasetData}. + * @param cols Number of columns (features) will be extracted. + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @param <C> Type of a partition {@code context}. + * @return Dataset. + */ + public static <K, V, C extends Serializable> SimpleDataset<C> createSimpleDataset(Ignite ignite, + IgniteCache<K, V> upstreamCache, PartitionContextBuilder<K, V, C> partCtxBuilder, + IgniteBiFunction<K, V, double[]> featureExtractor, int cols) { + return createSimpleDataset(new CacheBasedDatasetBuilder<>(ignite, upstreamCache), partCtxBuilder, + featureExtractor, cols); + } + + /** + * Creates a new instance of distributed {@link SimpleLabeledDataset} using the specified {@code partCtxBuilder}, + * {@code featureExtractor} and {@code lbExtractor}. This method determines partition {@code data} to be + * {@link SimpleLabeledDatasetData}, but allows to use any desired type of partition {@code context}. + * + * @param datasetBuilder Dataset builder. + * @param partCtxBuilder Partition {@code context} builder. + * @param featureExtractor Feature extractor used to extract features and build {@link SimpleLabeledDatasetData}. + * @param lbExtractor Label extractor used to extract labels and buikd {@link SimpleLabeledDatasetData}. + * @param cols Number of columns (features) will be extracted. + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @param <C> Type of a partition {@code context}. + * @return Dataset. + */ + public static <K, V, C extends Serializable> SimpleLabeledDataset<C> createSimpleLabeledDataset( + DatasetBuilder<K, V> datasetBuilder, PartitionContextBuilder<K, V, C> partCtxBuilder, + IgniteBiFunction<K, V, double[]> featureExtractor, IgniteBiFunction<K, V, Double> lbExtractor, int cols) { + return create( + datasetBuilder, + partCtxBuilder, + new SimpleLabeledDatasetDataBuilder<>(featureExtractor, lbExtractor, cols) + ).wrap(SimpleLabeledDataset::new); + } + + /** + * Creates a new instance of distributed {@link SimpleLabeledDataset} using the specified {@code partCtxBuilder}, + * {@code featureExtractor} and {@code lbExtractor}. This method determines partition {@code data} to be + * {@link SimpleLabeledDatasetData}, but allows to use any desired type of partition {@code context}. + * + * @param ignite Ignite instance. + * @param upstreamCache Ignite Cache with {@code upstream} data. + * @param partCtxBuilder Partition {@code context} builder. + * @param featureExtractor Feature extractor used to extract features and build {@link SimpleLabeledDatasetData}. + * @param lbExtractor Label extractor used to extract labels and buikd {@link SimpleLabeledDatasetData}. + * @param cols Number of columns (features) will be extracted. + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @param <C> Type of a partition {@code context}. + * @return Dataset. + */ + public static <K, V, C extends Serializable> SimpleLabeledDataset<C> createSimpleLabeledDataset(Ignite ignite, + IgniteCache<K, V> upstreamCache, PartitionContextBuilder<K, V, C> partCtxBuilder, + IgniteBiFunction<K, V, double[]> featureExtractor, IgniteBiFunction<K, V, Double> lbExtractor, int cols) { + return createSimpleLabeledDataset(new CacheBasedDatasetBuilder<>(ignite, upstreamCache), partCtxBuilder, + featureExtractor, lbExtractor, cols); + } + + /** + * Creates a new instance of distributed {@link SimpleDataset} using the specified {@code featureExtractor}. This + * methods determines partition {@code context} to be {@link EmptyContext} and partition {@code data} to be + * {@link SimpleDatasetData}. + * + * @param datasetBuilder Dataset builder. + * @param featureExtractor Feature extractor used to extract features and build {@link SimpleDatasetData}. + * @param cols Number of columns (features) will be extracted. + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @return Dataset. + */ + public static <K, V> SimpleDataset<EmptyContext> createSimpleDataset(DatasetBuilder<K, V> datasetBuilder, + IgniteBiFunction<K, V, double[]> featureExtractor, int cols) { + return createSimpleDataset(datasetBuilder, new EmptyContextBuilder<>(), featureExtractor, cols); + } + + /** + * Creates a new instance of distributed {@link SimpleDataset} using the specified {@code featureExtractor}. This + * methods determines partition {@code context} to be {@link EmptyContext} and partition {@code data} to be + * {@link SimpleDatasetData}. + * + * @param ignite Ignite instance. + * @param upstreamCache Ignite Cache with {@code upstream} data. + * @param featureExtractor Feature extractor used to extract features and build {@link SimpleDatasetData}. + * @param cols Number of columns (features) will be extracted. + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @return Dataset. + */ + public static <K, V> SimpleDataset<EmptyContext> createSimpleDataset(Ignite ignite, IgniteCache<K, V> upstreamCache, + IgniteBiFunction<K, V, double[]> featureExtractor, int cols) { + return createSimpleDataset(new CacheBasedDatasetBuilder<>(ignite, upstreamCache), featureExtractor, cols); + } + + /** + * Creates a new instance of distributed {@link SimpleLabeledDataset} using the specified {@code featureExtractor} + * and {@code lbExtractor}. This methods determines partition {@code context} to be {@link EmptyContext} and + * partition {@code data} to be {@link SimpleLabeledDatasetData}. + * + * @param datasetBuilder Dataset builder. + * @param featureExtractor Feature extractor used to extract features and build {@link SimpleLabeledDatasetData}. + * @param lbExtractor Label extractor used to extract labels and buikd {@link SimpleLabeledDatasetData}. + * @param cols Number of columns (features) will be extracted. + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @return Dataset. + */ + public static <K, V> SimpleLabeledDataset<EmptyContext> createSimpleLabeledDataset( + DatasetBuilder<K, V> datasetBuilder, IgniteBiFunction<K, V, double[]> featureExtractor, + IgniteBiFunction<K, V, Double> lbExtractor, int cols) { + return createSimpleLabeledDataset(datasetBuilder, new EmptyContextBuilder<>(), featureExtractor, lbExtractor, + cols); + } + + /** + * Creates a new instance of distributed {@link SimpleLabeledDataset} using the specified {@code featureExtractor} + * and {@code lbExtractor}. This methods determines partition {@code context} to be {@link EmptyContext} and + * partition {@code data} to be {@link SimpleLabeledDatasetData}. + * + * @param ignite Ignite instance. + * @param upstreamCache Ignite Cache with {@code upstream} data. + * @param featureExtractor Feature extractor used to extract features and build {@link SimpleLabeledDatasetData}. + * @param lbExtractor Label extractor used to extract labels and buikd {@link SimpleLabeledDatasetData}. + * @param cols Number of columns (features) will be extracted. + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @return Dataset. + */ + public static <K, V> SimpleLabeledDataset<EmptyContext> createSimpleLabeledDataset(Ignite ignite, + IgniteCache<K, V> upstreamCache, IgniteBiFunction<K, V, double[]> featureExtractor, + IgniteBiFunction<K, V, Double> lbExtractor, int cols) { + return createSimpleLabeledDataset(new CacheBasedDatasetBuilder<>(ignite, upstreamCache), featureExtractor, + lbExtractor, cols); + } + + /** + * Creates a new instance of local dataset using the specified {@code partCtxBuilder} and {@code partDataBuilder}. + * This is the generic methods that allows to create any Ignite Cache based datasets with any desired partition + * {@code context} and {@code data}. + * + * @param upstreamMap {@code Map} with {@code upstream} data. + * @param partitions Number of partitions {@code upstream} {@code Map} will be divided on. + * @param partCtxBuilder Partition {@code context} builder. + * @param partDataBuilder Partition {@code data} builder. + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @param <C> Type of a partition {@code context}. + * @param <D> Type of a partition {@code data}. + * @return Dataset. + */ + public static <K, V, C extends Serializable, D extends AutoCloseable> Dataset<C, D> create( + Map<K, V> upstreamMap, int partitions, PartitionContextBuilder<K, V, C> partCtxBuilder, + PartitionDataBuilder<K, V, C, D> partDataBuilder) { + return create(new LocalDatasetBuilder<>(upstreamMap, partitions), partCtxBuilder, partDataBuilder); + } + + /** + * Creates a new instance of local {@link SimpleDataset} using the specified {@code partCtxBuilder} and + * {@code featureExtractor}. This methods determines partition {@code data} to be {@link SimpleDatasetData}, but + * allows to use any desired type of partition {@code context}. + * + * @param upstreamMap {@code Map} with {@code upstream} data. + * @param partitions Number of partitions {@code upstream} {@code Map} will be divided on. + * @param partCtxBuilder Partition {@code context} builder. + * @param featureExtractor Feature extractor used to extract features and build {@link SimpleDatasetData}. + * @param cols Number of columns (features) will be extracted. + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @param <C> Type of a partition {@code context}. + * @return Dataset. + */ + public static <K, V, C extends Serializable> SimpleDataset<C> createSimpleDataset(Map<K, V> upstreamMap, + int partitions, PartitionContextBuilder<K, V, C> partCtxBuilder, + IgniteBiFunction<K, V, double[]> featureExtractor, int cols) { + return createSimpleDataset(new LocalDatasetBuilder<>(upstreamMap, partitions), partCtxBuilder, featureExtractor, + cols); + } + + /** + * Creates a new instance of local {@link SimpleLabeledDataset} using the specified {@code partCtxBuilder}, + * {@code featureExtractor} and {@code lbExtractor}. This method determines partition {@code data} to be + * {@link SimpleLabeledDatasetData}, but allows to use any desired type of partition {@code context}. + * + * @param upstreamMap {@code Map} with {@code upstream} data. + * @param partitions Number of partitions {@code upstream} {@code Map} will be divided on. + * @param partCtxBuilder Partition {@code context} builder. + * @param featureExtractor Feature extractor used to extract features and build {@link SimpleLabeledDatasetData}. + * @param lbExtractor Label extractor used to extract labels and buikd {@link SimpleLabeledDatasetData}. + * @param cols Number of columns (features) will be extracted. + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @param <C> Type of a partition {@code context}. + * @return Dataset. + */ + public static <K, V, C extends Serializable> SimpleLabeledDataset<C> createSimpleLabeledDataset( + Map<K, V> upstreamMap, int partitions, PartitionContextBuilder<K, V, C> partCtxBuilder, + IgniteBiFunction<K, V, double[]> featureExtractor, IgniteBiFunction<K, V, Double> lbExtractor, int cols) { + return createSimpleLabeledDataset(new LocalDatasetBuilder<>(upstreamMap, partitions), partCtxBuilder, + featureExtractor, lbExtractor, cols); + } + + /** + * Creates a new instance of local {@link SimpleDataset} using the specified {@code featureExtractor}. This + * methods determines partition {@code context} to be {@link EmptyContext} and partition {@code data} to be + * {@link SimpleDatasetData}. + * + * @param upstreamMap {@code Map} with {@code upstream} data. + * @param partitions Number of partitions {@code upstream} {@code Map} will be divided on. + * @param featureExtractor Feature extractor used to extract features and build {@link SimpleDatasetData}. + * @param cols Number of columns (features) will be extracted. + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @return Dataset. + */ + public static <K, V> SimpleDataset<EmptyContext> createSimpleDataset(Map<K, V> upstreamMap, int partitions, + IgniteBiFunction<K, V, double[]> featureExtractor, int cols) { + return createSimpleDataset(new LocalDatasetBuilder<>(upstreamMap, partitions), featureExtractor, cols); + } + + /** + * Creates a new instance of local {@link SimpleLabeledDataset} using the specified {@code featureExtractor} + * and {@code lbExtractor}. This methods determines partition {@code context} to be {@link EmptyContext} and + * partition {@code data} to be {@link SimpleLabeledDatasetData}. + * + * @param upstreamMap {@code Map} with {@code upstream} data. + * @param partitions Number of partitions {@code upstream} {@code Map} will be divided on. + * @param featureExtractor Feature extractor used to extract features and build {@link SimpleLabeledDatasetData}. + * @param lbExtractor Label extractor used to extract labels and build {@link SimpleLabeledDatasetData}. + * @param cols Number of columns (features) will be extracted. + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @return Dataset. + */ + public static <K, V> SimpleLabeledDataset<EmptyContext> createSimpleLabeledDataset(Map<K, V> upstreamMap, + int partitions, IgniteBiFunction<K, V, double[]> featureExtractor, IgniteBiFunction<K, V, Double> lbExtractor, + int cols) { + return createSimpleLabeledDataset(new LocalDatasetBuilder<>(upstreamMap, partitions), featureExtractor, + lbExtractor, cols); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionContextBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionContextBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionContextBuilder.java new file mode 100644 index 0000000..21c9907 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionContextBuilder.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset; + +import java.io.Serializable; +import java.util.Iterator; +import org.apache.ignite.ml.dataset.primitive.builder.context.EmptyContextBuilder; +import org.apache.ignite.ml.math.functions.IgniteFunction; + +/** + * Builder that accepts a partition {@code upstream} data and makes partition {@code context}. This builder is used to + * build a partition {@code context} and assumed to be called only once for every partition during a dataset + * initialization. + * + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @param <C> Type of a partition {@code context}. + * + * @see EmptyContextBuilder + */ +@FunctionalInterface +public interface PartitionContextBuilder<K, V, C extends Serializable> extends Serializable { + /** + * Builds a new partition {@code context} from an {@code upstream} data. + * + * @param upstreamData Partition {@code upstream} data. + * @param upstreamDataSize Partition {@code upstream} data size. + * @return Partition {@code context}. + */ + public C build(Iterator<UpstreamEntry<K, V>> upstreamData, long upstreamDataSize); + + /** + * Makes a composed partition {@code context} builder that first builds a {@code context} and then applies the + * specified function on the result. + * + * @param fun Function that applied after first partition {@code context} is built. + * @param <C2> New type of a partition {@code context}. + * @return Composed partition {@code context} builder. + */ + default public <C2 extends Serializable> PartitionContextBuilder<K, V, C2> andThen(IgniteFunction<C, C2> fun) { + return (upstreamData, upstreamDataSize) -> fun.apply(build(upstreamData, upstreamDataSize)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionDataBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionDataBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionDataBuilder.java new file mode 100644 index 0000000..361719f --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/PartitionDataBuilder.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset; + +import java.io.Serializable; +import java.util.Iterator; +import org.apache.ignite.ml.dataset.primitive.builder.data.SimpleDatasetDataBuilder; +import org.apache.ignite.ml.dataset.primitive.builder.data.SimpleLabeledDatasetDataBuilder; +import org.apache.ignite.ml.math.functions.IgniteBiFunction; + +/** + * Builder that accepts a partition {@code upstream} data and partition {@code context} and makes partition + * {@code data}. This builder is used to build a partition {@code data} and assumed to be called in all cases when + * partition {@code data} not found on the node that performs computation (it might be the result of a previous node + * failure or rebalancing). + * + * @param <K> Type of a key in <tt>upstream</tt> data. + * @param <V> Type of a value in <tt>upstream</tt> data. + * @param <C> Type of a partition <tt>context</tt>. + * @param <D> Type of a partition <tt>data</tt>. + * @see SimpleDatasetDataBuilder + * @see SimpleLabeledDatasetDataBuilder + */ +@FunctionalInterface +public interface PartitionDataBuilder<K, V, C extends Serializable, D extends AutoCloseable> extends Serializable { + /** + * Builds a new partition {@code data} from a partition {@code upstream} data and partition {@code context} + * + * @param upstreamData Partition {@code upstream} data. + * @param upstreamDataSize Partition {@code upstream} data size. + * @param ctx Partition {@code context}. + * @return Partition {@code data}. + */ + public D build(Iterator<UpstreamEntry<K, V>> upstreamData, long upstreamDataSize, C ctx); + + /** + * Makes a composed partition {@code data} builder that first builds a {@code data} and then applies the specified + * function on the result. + * + * @param fun Function that applied after first partition {@code data} is built. + * @param <D2> New type of a partition {@code data}. + * @return Composed partition {@code data} builder. + */ + default public <D2 extends AutoCloseable> PartitionDataBuilder<K, V, C, D2> andThen( + IgniteBiFunction<D, C, D2> fun) { + return (upstreamData, upstreamDataSize, ctx) -> fun.apply(build(upstreamData, upstreamDataSize, ctx), ctx); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/UpstreamEntry.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/UpstreamEntry.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/UpstreamEntry.java new file mode 100644 index 0000000..58226d9 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/UpstreamEntry.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset; + +/** + * Entry of the {@code upstream}. + * + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + */ +public class UpstreamEntry<K, V> { + /** Key. */ + private final K key; + + /** Value. */ + private final V val; + + /** + * Constructs a new instance of upstream entry. + * + * @param key Key. + * @param val Value. + */ + public UpstreamEntry(K key, V val) { + this.key = key; + this.val = val; + } + + /** */ + public K getKey() { + return key; + } + + /** */ + public V getValue() { + return val; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java new file mode 100644 index 0000000..463d496 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset.impl.cache; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.UUID; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.ml.dataset.Dataset; +import org.apache.ignite.ml.dataset.PartitionDataBuilder; +import org.apache.ignite.ml.dataset.impl.cache.util.ComputeUtils; +import org.apache.ignite.ml.math.functions.IgniteBiFunction; +import org.apache.ignite.ml.math.functions.IgniteBinaryOperator; +import org.apache.ignite.ml.math.functions.IgniteFunction; +import org.apache.ignite.ml.math.functions.IgniteTriFunction; + +/** + * An implementation of dataset based on Ignite Cache, which is used as {@code upstream} and as reliable storage for + * partition {@code context} as well. + * + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @param <C> Type of a partition {@code context}. + * @param <D> Type of a partition {@code data}. + */ +public class CacheBasedDataset<K, V, C extends Serializable, D extends AutoCloseable> + implements Dataset<C, D> { + /** Number of retries for the case when one of partitions not found on the node where computation is performed. */ + private static final int RETRIES = 15 * 60; + + /** Retry interval (ms) for the case when one of partitions not found on the node where computation is performed. */ + private static final int RETRY_INTERVAL = 1000; + + /** Ignite instance. */ + private final Ignite ignite; + + /** Ignite Cache with {@code upstream} data. */ + private final IgniteCache<K, V> upstreamCache; + + /** Ignite Cache with partition {@code context}. */ + private final IgniteCache<Integer, C> datasetCache; + + /** Partition {@code data} builder. */ + private final PartitionDataBuilder<K, V, C, D> partDataBuilder; + + /** Dataset ID that is used to identify dataset in local storage on the node where computation is performed. */ + private final UUID datasetId; + + /** + * Constructs a new instance of dataset based on Ignite Cache, which is used as {@code upstream} and as reliable storage for + * partition {@code context} as well. + * + * @param ignite Ignite instance. + * @param upstreamCache Ignite Cache with {@code upstream} data. + * @param datasetCache Ignite Cache with partition {@code context}. + * @param partDataBuilder Partition {@code data} builder. + * @param datasetId Dataset ID. + */ + public CacheBasedDataset(Ignite ignite, IgniteCache<K, V> upstreamCache, + IgniteCache<Integer, C> datasetCache, PartitionDataBuilder<K, V, C, D> partDataBuilder, + UUID datasetId) { + this.ignite = ignite; + this.upstreamCache = upstreamCache; + this.datasetCache = datasetCache; + this.partDataBuilder = partDataBuilder; + this.datasetId = datasetId; + } + + /** {@inheritDoc} */ + @Override public <R> R computeWithCtx(IgniteTriFunction<C, D, Integer, R> map, IgniteBinaryOperator<R> reduce, R identity) { + String upstreamCacheName = upstreamCache.getName(); + String datasetCacheName = datasetCache.getName(); + + return computeForAllPartitions(part -> { + C ctx = ComputeUtils.getContext(Ignition.localIgnite(), datasetCacheName, part); + + D data = ComputeUtils.getData( + Ignition.localIgnite(), + upstreamCacheName, + datasetCacheName, + datasetId, + part, + partDataBuilder + ); + + R res = map.apply(ctx, data, part); + + // Saves partition context after update. + ComputeUtils.saveContext(Ignition.localIgnite(), datasetCacheName, part, ctx); + + return res; + }, reduce, identity); + } + + /** {@inheritDoc} */ + @Override public <R> R compute(IgniteBiFunction<D, Integer, R> map, IgniteBinaryOperator<R> reduce, R identity) { + String upstreamCacheName = upstreamCache.getName(); + String datasetCacheName = datasetCache.getName(); + + return computeForAllPartitions(part -> { + D data = ComputeUtils.getData( + Ignition.localIgnite(), + upstreamCacheName, + datasetCacheName, + datasetId, + part, + partDataBuilder + ); + + return map.apply(data, part); + }, reduce, identity); + } + + /** {@inheritDoc} */ + @Override public void close() { + datasetCache.destroy(); + } + + /** + * Calls the {@code MapReduce} job specified as the {@code fun} function and the {@code reduce} reducer on all + * partitions with guarantee that partitions with the same index of upstream and partition {@code context} caches + * will be on the same node during the computation and will not be moved before computation is finished. + * + * @param fun Function that applies to all partitions. + * @param reduce Function that reduces results of {@code fun}. + * @param identity Identity. + * @param <R> Type of a result. + * @return Final result. + */ + private <R> R computeForAllPartitions(IgniteFunction<Integer, R> fun, IgniteBinaryOperator<R> reduce, R identity) { + Collection<String> cacheNames = Arrays.asList(datasetCache.getName(), upstreamCache.getName()); + Collection<R> results = ComputeUtils.affinityCallWithRetries(ignite, cacheNames, fun, RETRIES, RETRY_INTERVAL); + + R res = identity; + for (R partRes : results) + res = reduce.apply(res, partRes); + + return res; + } + + /** */ + public IgniteCache<K, V> getUpstreamCache() { + return upstreamCache; + } + + /** */ + public IgniteCache<Integer, C> getDatasetCache() { + return datasetCache; + } +}
