This is an automated email from the ASF dual-hosted git repository.
rpardomeza pushed a commit to branch multi-spark
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
The following commit(s) were added to refs/heads/multi-spark by this push:
new 203e8348 Changes to run current version
203e8348 is described below
commit 203e8348275f351a3bd71163079699d1edc9cfb5
Author: Rodrigo Pardo Meza <[email protected]>
AuthorDate: Thu Jul 7 09:53:56 2022 +0200
Changes to run current version
---
.../org/apache/wayang/core/platform/Platform.java | 8 +
.../org/apache/wayang/core/test/DummyPlatform.java | 4 +
.../apache/wayang/java/platform/JavaPlatform.java | 4 +
.../apache/wayang/jdbc/test/HsqldbPlatform.java | 5 +
.../main/java/org/apache/wayang/spark/Spark.java | 26 +
.../wayang/spark/channels/ChannelConversions.java | 173 ++++---
.../org/apache/wayang/spark/mapping/Mappings.java | 24 +
.../wayang/spark/platform/SparkPlatform.java | 42 +-
.../wayang/spark/plugin/SparkMultiPlugin.java | 47 ++
.../resources/wayang-sparky-default.properties | 568 +++++++++++++++++++++
.../org/apache/wayang/spark/test/DoubleSpark.java | 50 ++
11 files changed, 873 insertions(+), 78 deletions(-)
diff --git
a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/platform/Platform.java
b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/platform/Platform.java
index 7a4e1139..728aee03 100644
---
a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/platform/Platform.java
+++
b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/platform/Platform.java
@@ -60,6 +60,12 @@ public abstract class Platform {
this.configureDefaults(Configuration.getDefaultConfiguration());
}
+ protected Platform(String name, String configName, String config) {
+ this.name = name;
+ this.configName = configName;
+ this.configureCustom(Configuration.getDefaultConfiguration(), config);
+ }
+
/**
* Configure default settings for this instance, e.g., to be able to
create {@link LoadProfileToTimeConverter}s.
*
@@ -67,6 +73,8 @@ public abstract class Platform {
*/
protected abstract void configureDefaults(Configuration configuration);
+ protected abstract void configureCustom(Configuration configuration,
String config);
+
/**
* <i>Shortcut.</i> Creates an {@link Executor} using the {@link
#getExecutorFactory()}.
*
diff --git
a/wayang-commons/wayang-core/src/test/java/org/apache/wayang/core/test/DummyPlatform.java
b/wayang-commons/wayang-core/src/test/java/org/apache/wayang/core/test/DummyPlatform.java
index c1b381c7..81fdda07 100644
---
a/wayang-commons/wayang-core/src/test/java/org/apache/wayang/core/test/DummyPlatform.java
+++
b/wayang-commons/wayang-core/src/test/java/org/apache/wayang/core/test/DummyPlatform.java
@@ -48,6 +48,10 @@ public class DummyPlatform extends Platform {
public void configureDefaults(Configuration configuration) {
}
+ @Override
+ public void configureCustom(Configuration configuration, String lala) {
+ }
+
@Override
public Executor.Factory getExecutorFactory() {
throw new UnsupportedOperationException();
diff --git
a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/platform/JavaPlatform.java
b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/platform/JavaPlatform.java
index 4aba1443..89a6d7e4 100644
---
a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/platform/JavaPlatform.java
+++
b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/platform/JavaPlatform.java
@@ -56,6 +56,10 @@ public class JavaPlatform extends Platform {
configuration.load(ReflectionUtils.loadResource(DEFAULT_CONFIG_FILE));
}
+ @Override
+ public void configureCustom(Configuration configuration, String lala) {
+ }
+
@Override
public Executor.Factory getExecutorFactory() {
return job -> new JavaExecutor(this, job);
diff --git
a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbPlatform.java
b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbPlatform.java
index c83338d0..4c8d318a 100644
---
a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbPlatform.java
+++
b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbPlatform.java
@@ -18,6 +18,7 @@
package org.apache.wayang.jdbc.test;
+import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.jdbc.platform.JdbcPlatformTemplate;
/**
@@ -35,6 +36,10 @@ public class HsqldbPlatform extends JdbcPlatformTemplate {
return instance;
}
+ @Override
+ public void configureCustom(Configuration configuration, String lala) {
+ }
+
@Override
protected String getJdbcDriverClassName() {
return org.hsqldb.jdbcDriver.class.getName();
diff --git
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/Spark.java
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/Spark.java
index 3bd3d911..1976c55d 100644
---
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/Spark.java
+++
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/Spark.java
@@ -22,6 +22,10 @@ import org.apache.wayang.spark.platform.SparkPlatform;
import org.apache.wayang.spark.plugin.SparkBasicPlugin;
import org.apache.wayang.spark.plugin.SparkConversionPlugin;
import org.apache.wayang.spark.plugin.SparkGraphPlugin;
+import org.apache.wayang.spark.plugin.SparkMultiPlugin;
+
+import java.util.HashMap;
+import java.util.Map;
/**
* Register for relevant components of this module.
@@ -30,6 +34,8 @@ public class Spark {
private final static SparkBasicPlugin PLUGIN = new SparkBasicPlugin();
+ private final static Map<String, SparkMultiPlugin> MULTI_PLUGINS = new
HashMap<>();
+
private final static SparkGraphPlugin GRAPH_PLUGIN = new
SparkGraphPlugin();
private final static SparkConversionPlugin CONVERSION_PLUGIN = new
SparkConversionPlugin();
@@ -43,6 +49,18 @@ public class Spark {
return PLUGIN;
}
+ /**
+ * Retrieve the {@link SparkMultiPlugin}.
+ *
+ * @return the {@link SparkMultiPlugin}
+ */
+ public static SparkMultiPlugin multiPlugin(String name, String conf_path) {
+
+ SparkMultiPlugin MULTI_PLUGIN = new SparkMultiPlugin(name, conf_path);
+ MULTI_PLUGINS.put(name, MULTI_PLUGIN);
+ return MULTI_PLUGIN;
+ }
+
/**
* Retrieve the {@link SparkGraphPlugin}.
*
@@ -70,4 +88,12 @@ public class Spark {
return SparkPlatform.getInstance();
}
+ /**
+ * Retrieve the {@link SparkPlatform}.
+ *
+ * @return the {@link SparkPlatform}
+ */
+ public static SparkPlatform platform(String name, String config) {
+ return SparkPlatform.getInstance(name, config);
+ }
}
diff --git
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java
index 9f8fca12..33422909 100644
---
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java
+++
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java
@@ -36,89 +36,114 @@ import
org.apache.wayang.spark.operators.SparkTsvFileSource;
import java.util.Arrays;
import java.util.Collection;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
/**
* {@link ChannelConversion}s used by the {@link JavaPlatform}.
*/
public class ChannelConversions {
- public static final ChannelConversion UNCACHED_RDD_TO_CACHED_RDD = new
DefaultChannelConversion(
- RddChannel.UNCACHED_DESCRIPTOR,
- RddChannel.CACHED_DESCRIPTOR,
- () -> new
SparkCacheOperator<>(DataSetType.createDefault(Void.class))
+ private static Supplier<ChannelConversion> UNCACHED_RDD_TO_CACHED_RDD (){
+ return () -> new DefaultChannelConversion(
+ RddChannel.UNCACHED_DESCRIPTOR,
+ RddChannel.CACHED_DESCRIPTOR,
+ () -> new
SparkCacheOperator<>(DataSetType.createDefault(Void.class))
+ );
+ }
+
+ private static Supplier<ChannelConversion> COLLECTION_TO_BROADCAST (){
+ return () -> new DefaultChannelConversion(
+ CollectionChannel.DESCRIPTOR,
+ BroadcastChannel.DESCRIPTOR,
+ () -> new
SparkBroadcastOperator<>(DataSetType.createDefault(Void.class))
+ );}
+
+ private static Supplier<ChannelConversion> COLLECTION_TO_UNCACHED_RDD (){
+ return () -> new DefaultChannelConversion(
+ CollectionChannel.DESCRIPTOR,
+ RddChannel.UNCACHED_DESCRIPTOR,
+ () -> new
SparkCollectionSource<>(DataSetType.createDefault(Void.class))
+ );}
+
+ private static Supplier<ChannelConversion> UNCACHED_RDD_TO_COLLECTION (){
+ return () -> new DefaultChannelConversion(
+ RddChannel.UNCACHED_DESCRIPTOR,
+ CollectionChannel.DESCRIPTOR,
+ () -> new
SparkCollectOperator<>(DataSetType.createDefault(Void.class))
+ );}
+
+ private static Supplier<ChannelConversion> CACHED_RDD_TO_COLLECTION (){
+ return () -> new DefaultChannelConversion(
+ RddChannel.CACHED_DESCRIPTOR,
+ CollectionChannel.DESCRIPTOR,
+ () -> new
SparkCollectOperator<>(DataSetType.createDefault(Void.class))
+ );}
+
+ private static Supplier<ChannelConversion> CACHED_RDD_TO_HDFS_TSV (){
+ return () -> new DefaultChannelConversion(
+ RddChannel.CACHED_DESCRIPTOR,
+ FileChannel.HDFS_TSV_DESCRIPTOR,
+ () -> new
SparkTsvFileSink<>(DataSetType.createDefaultUnchecked(Tuple2.class))
+ );}
+
+ private static Supplier<ChannelConversion> UNCACHED_RDD_TO_HDFS_TSV (){
+ return () -> new DefaultChannelConversion(
+ RddChannel.UNCACHED_DESCRIPTOR,
+ FileChannel.HDFS_TSV_DESCRIPTOR,
+ () -> new
SparkTsvFileSink<>(DataSetType.createDefaultUnchecked(Tuple2.class))
+ );}
+
+ private static Supplier<ChannelConversion> HDFS_TSV_TO_UNCACHED_RDD (){
+ return () -> new DefaultChannelConversion(
+ FileChannel.HDFS_TSV_DESCRIPTOR,
+ RddChannel.UNCACHED_DESCRIPTOR,
+ () -> new
SparkTsvFileSource(DataSetType.createDefault(Tuple2.class))
+ );}
+
+ private static Supplier<ChannelConversion> CACHED_RDD_TO_HDFS_OBJECT_FILE
() {
+ return () -> new DefaultChannelConversion(
+ RddChannel.CACHED_DESCRIPTOR,
+ FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR,
+ () -> new
SparkObjectFileSink<>(DataSetType.createDefault(Void.class))
+ );
+ }
+
+ private static Supplier<ChannelConversion>
UNCACHED_RDD_TO_HDFS_OBJECT_FILE () {
+ return () -> new DefaultChannelConversion(
+ RddChannel.UNCACHED_DESCRIPTOR,
+ FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR,
+ () -> new
SparkObjectFileSink<>(DataSetType.createDefault(Void.class))
+ );
+ }
+
+ private static Supplier<ChannelConversion>
HDFS_OBJECT_FILE_TO_UNCACHED_RDD () {
+ return () -> new DefaultChannelConversion(
+ FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR,
+ RddChannel.UNCACHED_DESCRIPTOR,
+ () -> new
SparkObjectFileSource<>(DataSetType.createDefault(Void.class))
+ );
+ }
+
+ public static Collection<Supplier<ChannelConversion>> SUPPLIERS =
Arrays.asList(
+ UNCACHED_RDD_TO_CACHED_RDD(),
+ COLLECTION_TO_BROADCAST(),
+ COLLECTION_TO_UNCACHED_RDD(),
+ UNCACHED_RDD_TO_COLLECTION(),
+ CACHED_RDD_TO_COLLECTION(),
+ CACHED_RDD_TO_HDFS_OBJECT_FILE(),
+ UNCACHED_RDD_TO_HDFS_OBJECT_FILE(),
+ HDFS_OBJECT_FILE_TO_UNCACHED_RDD(),
+// HDFS_TSV_TO_UNCACHED_RDD(),
+ CACHED_RDD_TO_HDFS_TSV(),
+ UNCACHED_RDD_TO_HDFS_TSV()
);
- public static final ChannelConversion COLLECTION_TO_BROADCAST = new
DefaultChannelConversion(
- CollectionChannel.DESCRIPTOR,
- BroadcastChannel.DESCRIPTOR,
- () -> new
SparkBroadcastOperator<>(DataSetType.createDefault(Void.class))
- );
-
- public static final ChannelConversion COLLECTION_TO_UNCACHED_RDD = new
DefaultChannelConversion(
- CollectionChannel.DESCRIPTOR,
- RddChannel.UNCACHED_DESCRIPTOR,
- () -> new
SparkCollectionSource<>(DataSetType.createDefault(Void.class))
- );
-
- public static final ChannelConversion UNCACHED_RDD_TO_COLLECTION = new
DefaultChannelConversion(
- RddChannel.UNCACHED_DESCRIPTOR,
- CollectionChannel.DESCRIPTOR,
- () -> new
SparkCollectOperator<>(DataSetType.createDefault(Void.class))
- );
-
- public static final ChannelConversion CACHED_RDD_TO_COLLECTION = new
DefaultChannelConversion(
- RddChannel.CACHED_DESCRIPTOR,
- CollectionChannel.DESCRIPTOR,
- () -> new
SparkCollectOperator<>(DataSetType.createDefault(Void.class))
- );
-
- public static final ChannelConversion CACHED_RDD_TO_HDFS_TSV = new
DefaultChannelConversion(
- RddChannel.CACHED_DESCRIPTOR,
- FileChannel.HDFS_TSV_DESCRIPTOR,
- () -> new
SparkTsvFileSink<>(DataSetType.createDefaultUnchecked(Tuple2.class))
- );
+ public static Collection<ChannelConversion> ALL = getALL();
- public static final ChannelConversion UNCACHED_RDD_TO_HDFS_TSV = new
DefaultChannelConversion(
- RddChannel.UNCACHED_DESCRIPTOR,
- FileChannel.HDFS_TSV_DESCRIPTOR,
- () -> new
SparkTsvFileSink<>(DataSetType.createDefaultUnchecked(Tuple2.class))
- );
+ public static Collection<ChannelConversion> getALL(){
+ return
SUPPLIERS.stream().map(Supplier::get).collect(Collectors.toList());
+ }
- public static final ChannelConversion HDFS_TSV_TO_UNCACHED_RDD = new
DefaultChannelConversion(
- FileChannel.HDFS_TSV_DESCRIPTOR,
- RddChannel.UNCACHED_DESCRIPTOR,
- () -> new
SparkTsvFileSource(DataSetType.createDefault(Tuple2.class))
- );
- public static final ChannelConversion CACHED_RDD_TO_HDFS_OBJECT_FILE = new
DefaultChannelConversion(
- RddChannel.CACHED_DESCRIPTOR,
- FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR,
- () -> new
SparkObjectFileSink<>(DataSetType.createDefault(Void.class))
- );
-
- public static final ChannelConversion UNCACHED_RDD_TO_HDFS_OBJECT_FILE =
new DefaultChannelConversion(
- RddChannel.UNCACHED_DESCRIPTOR,
- FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR,
- () -> new
SparkObjectFileSink<>(DataSetType.createDefault(Void.class))
- );
-
- public static final ChannelConversion HDFS_OBJECT_FILE_TO_UNCACHED_RDD =
new DefaultChannelConversion(
- FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR,
- RddChannel.UNCACHED_DESCRIPTOR,
- () -> new
SparkObjectFileSource<>(DataSetType.createDefault(Void.class))
- );
-
- public static Collection<ChannelConversion> ALL = Arrays.asList(
- UNCACHED_RDD_TO_CACHED_RDD,
- COLLECTION_TO_BROADCAST,
- COLLECTION_TO_UNCACHED_RDD,
- UNCACHED_RDD_TO_COLLECTION,
- CACHED_RDD_TO_COLLECTION,
- CACHED_RDD_TO_HDFS_OBJECT_FILE,
- UNCACHED_RDD_TO_HDFS_OBJECT_FILE,
- HDFS_OBJECT_FILE_TO_UNCACHED_RDD,
-// HDFS_TSV_TO_UNCACHED_RDD,
- CACHED_RDD_TO_HDFS_TSV,
- UNCACHED_RDD_TO_HDFS_TSV
- );
}
diff --git
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java
index 046fb280..4e266405 100644
---
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java
+++
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java
@@ -21,8 +21,10 @@ package org.apache.wayang.spark.mapping;
import org.apache.wayang.core.mapping.Mapping;
import org.apache.wayang.spark.mapping.graph.PageRankMapping;
+import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Collection;
+import java.util.stream.Collectors;
/**
* Register for {@link Mapping}s for this platform.
@@ -63,4 +65,26 @@ public class Mappings {
new PageRankMapping()
);
+ public static Collection<Mapping> getBasicMappings(){
+ return BASIC_MAPPINGS.stream()
+ .map(mapping -> {
+ try {
+ return
mapping.getClass().getConstructor().newInstance();
+ } catch (InstantiationException e) {
+ e.printStackTrace();
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ } catch (InvocationTargetException e) {
+ e.printStackTrace();
+ } catch (NoSuchMethodException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+ )
+ .collect(Collectors.toList());
+ }
+
+
+
}
diff --git
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
index 9b574afc..d3c7b791 100644
---
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
+++
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
@@ -41,6 +41,8 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
/**
@@ -52,12 +54,14 @@ public class SparkPlatform extends Platform {
private static final String CONFIG_NAME = "spark";
- private static final String DEFAULT_CONFIG_FILE =
"wayang-spark-defaults.properties";
+ private String DEFAULT_CONFIG_FILE = "wayang-spark-defaults.properties";
public static final String INITIALIZATION_MS_CONFIG_KEY =
"wayang.spark.init.ms";
private static SparkPlatform instance = null;
+ private static Map<String, SparkPlatform> instances = new HashMap<>();
+
private static final String[] REQUIRED_SPARK_PROPERTIES = {
"spark.master"
};
@@ -95,16 +99,41 @@ public class SparkPlatform extends Platform {
private Logger logger = LogManager.getLogger(this.getClass());
public static SparkPlatform getInstance() {
- if (instance == null) {
- instance = new SparkPlatform();
+ if(instances.isEmpty()) {
+ instances.put("default", new SparkPlatform());
+// if (instance == null) {
+// instance = new SparkPlatform();
+// }
+ return instances.get("default");
+ } else {
+ String first = instances.keySet().stream().findFirst().get();
+ System.out.println("first");
+ System.out.println(first);
+ return instances.get(first);
+ }
+
+// return instance;
+ }
+
+ public static SparkPlatform getInstance(String name, String config) {
+ if (!instances.containsKey(name)) {
+ instances.put(name, new SparkPlatform(name, config));
+// if (instance == null) {
+// instance = new SparkPlatform(name, config);
+// }
}
- return instance;
+ return instances.get(name);
+// return instance;
}
private SparkPlatform() {
super(PLATFORM_NAME, CONFIG_NAME);
}
+ private SparkPlatform(String name, String config) {
+ super(PLATFORM_NAME + " " + name, name, config);
+ }
+
/**
* Configures the single maintained {@link JavaSparkContext} according to
the {@code job} and returns it.
*
@@ -169,6 +198,11 @@ public class SparkPlatform extends Platform {
configuration.load(ReflectionUtils.loadResource(DEFAULT_CONFIG_FILE));
}
+ @Override
+ public void configureCustom(Configuration configuration, String config) {
+ configuration.load(ReflectionUtils.loadResource(config));
+ }
+
@Override
public LoadProfileToTimeConverter
createLoadProfileToTimeConverter(Configuration configuration) {
int cpuMhz = (int)
configuration.getLongProperty("wayang.spark.cpu.mhz");
diff --git
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/plugin/SparkMultiPlugin.java
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/plugin/SparkMultiPlugin.java
new file mode 100644
index 00000000..180acdba
--- /dev/null
+++
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/plugin/SparkMultiPlugin.java
@@ -0,0 +1,47 @@
+package org.apache.wayang.spark.plugin;
+
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.optimizer.channels.ChannelConversion;
+import org.apache.wayang.core.platform.Platform;
+import org.apache.wayang.core.plugin.Plugin;
+import org.apache.wayang.java.platform.JavaPlatform;
+import org.apache.wayang.spark.channels.ChannelConversions;
+import org.apache.wayang.spark.mapping.Mappings;
+import org.apache.wayang.spark.platform.SparkPlatform;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+public class SparkMultiPlugin implements Plugin {
+
+ String name;
+ String config;
+
+ public SparkMultiPlugin(String name, String config){
+ this.name = name;
+ this.config = config;
+ }
+
+ @Override
+ public Collection<Mapping> getMappings() {
+ return Mappings.getBasicMappings();
+// return Mappings.BASIC_MAPPINGS;
+ }
+
+ @Override
+ public Collection<ChannelConversion> getChannelConversions() {
+
+ return ChannelConversions.ALL;
+ }
+
+ @Override
+ public Collection<Platform> getRequiredPlatforms() {
+ return Arrays.asList(SparkPlatform.getInstance(this.name,
this.config), JavaPlatform.getInstance());
+ }
+
+ @Override
+ public void setProperties(Configuration configuration) {
+ // Nothing to do, because we already configured the properties in
#configureDefaults(...).
+ }
+}
diff --git
a/wayang-platforms/wayang-spark/code/main/resources/wayang-sparky-default.properties
b/wayang-platforms/wayang-spark/code/main/resources/wayang-sparky-default.properties
new file mode 100644
index 00000000..47fcdbd0
--- /dev/null
+++
b/wayang-platforms/wayang-spark/code/main/resources/wayang-sparky-default.properties
@@ -0,0 +1,568 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+spark.master = local[*]
+spark.app.name = Wayang App Spark
+spark.ui.showConsoleProgress = false
+spark.driver.allowMultipleContexts=true
+# spark.driver.memory = 1g
+
+wayang.spark.cpu.mhz = 2700
+wayang.spark.machines = 1
+wayang.spark.cores-per-machine = 2
+wayang.spark.hdfs.ms-per-mb = 2.7
+wayang.spark.network.ms-per-mb = 8.6
+wayang.spark.init.ms = 4500
+wayang.spark.stretch = 1
+wayang.spark.costs.fix = 0.0
+wayang.spark.costs.per-ms = 1.0
+
+
+wayang.spark.map.load.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.map.load = {\
+ "in":1, "out":1,\
+ "cpu":"${700*in0 + 56789}",\
+ "ram":"10000",\
+ "disk":"0",\
+ "net":"${0.2*out0 + 2000}",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.zipwithid.load.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.zipwithid.load = {\
+ "in":1, "out":1,\
+ "cpu":"${1000*in0 + 56789}",\
+ "ram":"10000",\
+ "disk":"0",\
+ "net":"${0.2*out0 + 2000}",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.mappartitions.load.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.mappartitions.load = {\
+ "in":1, "out":1,\
+ "cpu":"${600*in0 + 600*out0 + 56789}",\
+ "ram":"10000",\
+ "disk":"0",\
+ "net":"${0.2*out0 + 2000}",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.filter.load.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.filter.load = {\
+ "in":1, "out":1,\
+ "cpu":"${500*in0 + 56789}",\
+ "ram":"10000",\
+ "disk":"0",\
+ "net":"0",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.flatmap.load.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.flatmap.load = {\
+ "in":1, "out":1,\
+ "cpu":"${600*in0 + 600*out0 + 56789}",\
+ "ram":"10000",\
+ "disk":"0",\
+ "net":"${0.2 * in0 + 2000}",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+
+wayang.spark.bernoulli-sample.load.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.bernoulli-sample.load = {\
+ "in":1, "out":1,\
+ "cpu":"${700*in0 + 500000000}",\
+ "ram":"10000",\
+ "disk":"0",\
+ "net":"0",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.random-partition-sample.load.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.random-partition-sample.load = {\
+ "in":1, "out":1,\
+ "cpu":"${700*in0 + 500000000}",\
+ "ram":"10000",\
+ "disk":"0",\
+ "net":"0",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.shuffle-partition-sample.load.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.shuffle-partition-sample.load = {\
+ "in":1, "out":1,\
+ "cpu":"${699*in0 + 500000000}",\
+ "ram":"10000",\
+ "disk":"0",\
+ "net":"0",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.reduceby.load.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.reduceby.load = {\
+ "in":1, "out":1,\
+ "cpu":"${1700*in0 + 56789}",\
+ "ram":"10000",\
+ "disk":"${in0}",\
+ "net":"${0.3*in0 + 43000}",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.groupby.load.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.groupby.load = {\
+ "in":1, "out":1,\
+ "cpu":"${17000*in0 + 56789}",\
+ "ram":"10000",\
+ "disk":"${in0}",\
+ "net":"${0.3*in0 + 430000}",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.sort.load.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.sort.load = {\
+ "in":1, "out":1,\
+ "cpu":"${1700*in0 + 56789}",\
+ "ram":"10000",\
+ "disk":"${in0}",\
+ "net":"${0.3*in0 + 430000}",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.globalreduce.load.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.globalreduce.load = {\
+ "in":1, "out":1,\
+ "cpu":"${300*in0 + 56789}",\
+ "ram":"0",\
+ "disk":"0",\
+ "net":"200000",\
+ "p":0.9,\
+ "overhead":5,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.globalgroup.load.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.globalgroup.load = {\
+ "in":1, "out":1,\
+ "cpu":"${400*in0 + 56789}",\
+ "ram":"0",\
+ "disk":"0",\
+ "net":"200000",\
+ "p":0.9,\
+ "overhead":5,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.count.load.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.count.load = {\
+ "in":1, "out":1,\
+ "cpu":"${1000*in0 + 56789}",\
+ "ram":"0",\
+ "disk":"0",\
+ "net":"125000",\
+ "p":0.9,\
+ "overhead":5,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.distinct.load.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.distinct.load = {\
+ "in":1, "out":1,\
+ "cpu":"${1700*in0 + 56789}",\
+ "ram":"0",\
+ "disk":"0",\
+ "net":"${4*in0 + 430000}",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.join.load.template = {\
+ "type":"mathex", "in":2, "out":1,\
+ "cpu":"?*(in0 + in1) + ?*out0 + ?"\
+}
+wayang.spark.join.load = {\
+ "in":2, "out":1,\
+ "cpu":"${1700 * (in0 + in1 + out0) + 56789}",\
+ "ram":"0",\
+ "disk":"${20 * in0}",\
+ "net":"${20 * (in0 + in1 + out0) + 430000}",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0 + in1)}"\
+}
+
+wayang.spark.cogroup.load.template = {\
+ "type":"mathex", "in":2, "out":1,\
+ "cpu":"?*(in0 + in1) + ?*out0 + ?"\
+}
+wayang.spark.cogroup.load = {\
+ "in":2, "out":1,\
+ "cpu":"${1700 * (in0 + in1 + out0) + 56789}",\
+ "ram":"0",\
+ "disk":"${20 * in0}",\
+ "net":"${20 * (in0 + in1 + out0) + 430000}",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0 + in1)}"\
+}
+
+wayang.spark.intersect.load.template = {\
+ "type":"mathex", "in":2, "out":1,\
+ "cpu":"?*(in0 + in1) + ?*out0 + ?"\
+}
+wayang.spark.intersect.load = {\
+ "in":2, "out":1,\
+ "cpu":"${1300 * (in0 + in1 + out0) + 56789}",\
+ "ram":"0",\
+ "disk":"${20 * in0}",\
+ "net":"${20 * (in0 + in1 + out0) + 430000}",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0 + in1)}"\
+}
+
+wayang.spark.cartesian.load.template = {\
+ "type":"mathex", "in":2, "out":1,\
+ "cpu":"?*(in0 + in1) + ?*out0 + ?"\
+}
+wayang.spark.cartesian.load = {\
+ "in":2, "out":1,\
+ "cpu":"${20000000*in0 + 10000000*in1 + 100*out0 + 56789}",\
+ "ram":"0",\
+ "disk":"0",\
+ "net":"${20000*(in0 + in1) + 1700000}",\
+ "p":0.9,\
+ "overhead":1000,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0 + in1)}"\
+}
+
+wayang.spark.union.load.template = {\
+ "type":"mathex", "in":2, "out":1,\
+ "cpu":"?"\
+}
+wayang.spark.union.load = {\
+ "in":2, "out":1,\
+ "cpu":"56789",\
+ "ram":"0",\
+ "disk":"0",\
+ "net":"0",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0 + in1)}"\
+}
+
+wayang.spark.broadcast.load.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.broadcast.load = {\
+ "in":1, "out":1,\
+ "cpu":"${5500*out0 + 56789}",\
+ "ram":"${100*out0 + 12000}",\
+ "disk":"0",\
+ "net":"${9.5*in0 + 45000}",\
+ "p":0.9,\
+ "overhead":5,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.cache.load.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.cache.load = {\
+ "in":1, "out":1,\
+ "cpu":"${4000*out0 + 56789}",\
+ "ram":"10000",\
+ "disk":"0",\
+ "net":"${4.5*in0 + 43000}",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.loop.load.template = {\
+ "type":"mathex", "in":4, "out":3,\
+ "cpu":"?*in3 + ?"\
+}
+wayang.spark.loop.load = {\
+ "in":4, "out":3,\
+ "cpu":"${5000*in3 + 56789}",\
+ "ram":"10000",\
+ "disk":"0",\
+ "net":"${4.5*in0 + 43000}",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0 + in1 + in2 + in3)}"\
+}
+
+wayang.spark.while.load.template = {\
+ "type":"mathex", "in":3, "out":2,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.while.load = {\
+ "in":3, "out":2,\
+ "cpu":"${4000*in2 + 56789}",\
+ "ram":"10000",\
+ "disk":"0",\
+ "net":"${4.5*in0 + 43000}",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0 + in1 + in2)}"\
+}
+
+
+wayang.spark.repeat.load.template = {\
+ "type":"mathex", "in":2, "out":2,\
+ "cpu":"?"\
+}
+wayang.spark.repeat.load = {\
+ "in":2, "out":2,\
+ "cpu":"${810000}",\
+ "ram":"10000",\
+ "p":0.9\
+}
+
+wayang.spark.collectionsource.load.template = {\
+ "type":"mathex", "in":0, "out":1,\
+ "cpu":"?*out0 + ?"\
+}
+wayang.spark.collectionsource.load = {\
+ "in":0, "out":1,\
+ "cpu":"${400*out0 + 56789}",\
+ "ram":"${100*out0 + 2000}",\
+ "disk":"0",\
+ "net":"0",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, out0)}"\
+}
+
+wayang.spark.collect.load.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*out0 + ?"\
+}
+wayang.spark.collect.load = {\
+ "in":1, "out":1,\
+ "cpu":"${100*in0 + 56789}",\
+ "ram":"10000",\
+ "disk":"0",\
+ "net":"${4.5*in0 + 43000}",\
+ "p":0.9,\
+ "overhead":5,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.localcallbacksink.load.template = {\
+ "type":"mathex", "in":1, "out":0,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.localcallbacksink.load = {\
+ "in":1, "out":0,\
+ "cpu":"${4000*in0 + 56789}",\
+ "ram":"10000",\
+ "disk":"0",\
+ "net":"${4.5*in0 + 43000}",\
+ "p":0.9,\
+ "overhead":5,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.textfilesource.load.prepare.template = {\
+ "type":"mathex", "in":0, "out":1,\
+ "cpu":"?"\
+}
+wayang.spark.textfilesource.load.prepare = {\
+ "in":0, "out":1,\
+ "cpu":"${50056789}",\
+ "ram":"${10}",\
+ "disk":"${0}",\
+ "net":"${0}",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, out0)}"\
+}
+wayang.spark.textfilesource.load.main.template = {\
+ "type":"mathex", "in":0, "out":1,\
+ "cpu":"?*out0"\
+}
+wayang.spark.textfilesource.load.main = {\
+ "in":0, "out":1,\
+ "cpu":"${500*out0}",\
+ "ram":"${10}",\
+ "disk":"${0}",\
+ "net":"${0}",\
+ "p":0.9,\
+ "overhead":0,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, out0)}"\
+}
+
+
+wayang.spark.objectfilesource.load.template = {\
+ "type":"mathex", "in":0, "out":1,\
+ "cpu":"?*out0 + ?"\
+}
+wayang.spark.objectfilesource.load = {\
+ "in":0, "out":1,\
+ "cpu":"${700*out0 + 56789}",\
+ "ram":"${10*out0}",\
+ "disk":"${out0/10}",\
+ "net":"${out0 * 10 + 5000000}",\
+ "p":0.9,\
+ "overhead":10,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, out0)}"\
+}
+
+wayang.spark.objectfilesink.load.template = {\
+ "type":"mathex", "in":1, "out":0,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.objectfilesink.load = {\
+ "in":1, "out":0,\
+ "cpu":"${500*in0 + 56789}",\
+ "ram":"${10*in0}",\
+ "disk":"${in0/10}",\
+ "net":"${in0 * 10 + 5000000}",\
+ "p":0.9,\
+ "overhead":10,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.tsvfilesource.load.template = {\
+ "type":"mathex", "in":0, "out":1,\
+ "cpu":"?*out0 + ?"\
+}
+wayang.spark.tsvfilesource.load = {\
+ "in":0, "out":1,\
+ "cpu":"${700*out0 + 56789}",\
+ "ram":"${10*out0}",\
+ "disk":"${out0/10}",\
+ "net":"${out0 * 10 + 5000000}",\
+ "p":0.9,\
+ "overhead":10,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, out0)}"\
+}
+
+wayang.spark.tsvfilesink.load.template = {\
+ "type":"mathex", "in":1, "out":0,\
+ "cpu":"?*in0 + ?"\
+}
+wayang.spark.tsvfilesink.load = {\
+ "in":1, "out":0,\
+ "cpu":"${500*in0 + 56789}",\
+ "ram":"${10*in0}",\
+ "disk":"${in0/10}",\
+ "net":"${in0 * 10 + 5000000}",\
+ "p":0.9,\
+ "overhead":10,\
+ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.pagerank.load.main.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*in0 + ?*in0*numIterations + ?"\
+}
+wayang.spark.pagerank.load.main = {\
+ "in":1,\
+ "out":1,\
+ "cpu":"${5000*in0 + 2500*out0 + 1E8}",\
+ "ram":"0",\
+ "disk":"0",\
+ "net":"0",\
+ "p":0.9\
+}
+wayang.spark.pagerank.load.output.template = {\
+ "type":"mathex", "in":1, "out":1,\
+ "cpu":"?*out0"\
+}
+wayang.spark.pagerank.load.output = {\
+ "in":1,\
+ "out":1,\
+ "cpu":"0",\
+ "ram":"0",\
+ "disk":"0",\
+ "net":"0",\
+ "p":0.9\
+}
diff --git
a/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/test/DoubleSpark.java
b/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/test/DoubleSpark.java
new file mode 100644
index 00000000..cf39e24c
--- /dev/null
+++
b/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/test/DoubleSpark.java
@@ -0,0 +1,50 @@
+package org.apache.wayang.spark.test;
+
+import org.apache.wayang.basic.operators.FilterOperator;
+import org.apache.wayang.basic.operators.LocalCallbackSink;
+import org.apache.wayang.basic.operators.TextFileSource;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.plan.wayangplan.WayangPlan;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.ReflectionUtils;
+import org.apache.wayang.java.platform.JavaPlatform;
+import org.apache.wayang.spark.Spark;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class DoubleSpark {
+ public static void main(String[] args) {
+ List<String> collector = new LinkedList<>();
+ TextFileSource textFileSource = new
TextFileSource("file:///Users/rodrigopardomeza/files/demo.txt");
+ textFileSource.setName("Load file");
+
+ FilterOperator<String> filterOperator = new FilterOperator<>(str ->
!str.isEmpty(), String.class);
+ filterOperator.setName("Filter empty words");
+ filterOperator.addTargetPlatform(Spark.platform("sparky",
"wayang-sparky-default.properties"));
+
+ // write results to a sink
+ LocalCallbackSink<String> sink =
LocalCallbackSink.createCollectingSink(
+ collector,
+ DataSetType.createDefaultUnchecked(String.class)
+ );
+ sink.setName("Collect result");
+
+ // Build Rheem plan by connecting operators
+ textFileSource.connectTo(0, filterOperator, 0);
+ filterOperator.connectTo(0, sink, 0);
+
+ WayangContext wayangContext = new WayangContext();
+ //wayangContext.register(Java.basicPlugin());
+ wayangContext.register(Spark.multiPlugin("sparky",
"wayang-sparky-default.properties"));
+// wayangContext.register(Spark.multiPlugin("other",
"wayang-spark-defaults.properties"));
+// wayangContext.register(Spark.basicPlugin());
+
+ System.out.println("here");
+ wayangContext.execute(new WayangPlan(sink),
ReflectionUtils.getDeclaringJar(JavaPlatform.class));
+
+// collector.sort((t1 , t2) -> Integer.compare(t2.field1, t1.field1));
+ System.out.printf("Found %d words:\n", collector.size());
+ collector.forEach(wc -> System.out.printf("%s\n", wc));
+ }
+}