This is an automated email from the ASF dual-hosted git repository.

rpardomeza pushed a commit to branch multi-spark
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git


The following commit(s) were added to refs/heads/multi-spark by this push:
     new 203e8348 Changes to run current version
203e8348 is described below

commit 203e8348275f351a3bd71163079699d1edc9cfb5
Author: Rodrigo Pardo Meza <[email protected]>
AuthorDate: Thu Jul 7 09:53:56 2022 +0200

    Changes to run current version
---
 .../org/apache/wayang/core/platform/Platform.java  |   8 +
 .../org/apache/wayang/core/test/DummyPlatform.java |   4 +
 .../apache/wayang/java/platform/JavaPlatform.java  |   4 +
 .../apache/wayang/jdbc/test/HsqldbPlatform.java    |   5 +
 .../main/java/org/apache/wayang/spark/Spark.java   |  26 +
 .../wayang/spark/channels/ChannelConversions.java  | 173 ++++---
 .../org/apache/wayang/spark/mapping/Mappings.java  |  24 +
 .../wayang/spark/platform/SparkPlatform.java       |  42 +-
 .../wayang/spark/plugin/SparkMultiPlugin.java      |  47 ++
 .../resources/wayang-sparky-default.properties     | 568 +++++++++++++++++++++
 .../org/apache/wayang/spark/test/DoubleSpark.java  |  50 ++
 11 files changed, 873 insertions(+), 78 deletions(-)

diff --git 
a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/platform/Platform.java
 
b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/platform/Platform.java
index 7a4e1139..728aee03 100644
--- 
a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/platform/Platform.java
+++ 
b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/platform/Platform.java
@@ -60,6 +60,12 @@ public abstract class Platform {
         this.configureDefaults(Configuration.getDefaultConfiguration());
     }
 
+    protected Platform(String name, String configName, String config) {
+        this.name = name;
+        this.configName = configName;
+        this.configureCustom(Configuration.getDefaultConfiguration(), config);
+    }
+
     /**
      * Configure default settings for this instance, e.g., to be able to 
create {@link LoadProfileToTimeConverter}s.
      *
@@ -67,6 +73,8 @@ public abstract class Platform {
      */
     protected abstract void configureDefaults(Configuration configuration);
 
+    protected abstract void configureCustom(Configuration configuration, 
String config);
+
     /**
      * <i>Shortcut.</i> Creates an {@link Executor} using the {@link 
#getExecutorFactory()}.
      *
diff --git 
a/wayang-commons/wayang-core/src/test/java/org/apache/wayang/core/test/DummyPlatform.java
 
b/wayang-commons/wayang-core/src/test/java/org/apache/wayang/core/test/DummyPlatform.java
index c1b381c7..81fdda07 100644
--- 
a/wayang-commons/wayang-core/src/test/java/org/apache/wayang/core/test/DummyPlatform.java
+++ 
b/wayang-commons/wayang-core/src/test/java/org/apache/wayang/core/test/DummyPlatform.java
@@ -48,6 +48,10 @@ public class DummyPlatform extends Platform {
     public void configureDefaults(Configuration configuration) {
     }
 
+    @Override
+    public void configureCustom(Configuration configuration, String lala) {
+    }
+
     @Override
     public Executor.Factory getExecutorFactory() {
         throw new UnsupportedOperationException();
diff --git 
a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/platform/JavaPlatform.java
 
b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/platform/JavaPlatform.java
index 4aba1443..89a6d7e4 100644
--- 
a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/platform/JavaPlatform.java
+++ 
b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/platform/JavaPlatform.java
@@ -56,6 +56,10 @@ public class JavaPlatform extends Platform {
         configuration.load(ReflectionUtils.loadResource(DEFAULT_CONFIG_FILE));
     }
 
+    @Override
+    public void configureCustom(Configuration configuration, String lala) {
+    }
+
     @Override
     public Executor.Factory getExecutorFactory() {
         return job -> new JavaExecutor(this, job);
diff --git 
a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbPlatform.java
 
b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbPlatform.java
index c83338d0..4c8d318a 100644
--- 
a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbPlatform.java
+++ 
b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbPlatform.java
@@ -18,6 +18,7 @@
 
 package org.apache.wayang.jdbc.test;
 
+import org.apache.wayang.core.api.Configuration;
 import org.apache.wayang.jdbc.platform.JdbcPlatformTemplate;
 
 /**
@@ -35,6 +36,10 @@ public class HsqldbPlatform extends JdbcPlatformTemplate {
         return instance;
     }
 
+    @Override
+    public void configureCustom(Configuration configuration, String lala) {
+    }
+
     @Override
     protected String getJdbcDriverClassName() {
         return org.hsqldb.jdbcDriver.class.getName();
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/Spark.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/Spark.java
index 3bd3d911..1976c55d 100644
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/Spark.java
+++ 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/Spark.java
@@ -22,6 +22,10 @@ import org.apache.wayang.spark.platform.SparkPlatform;
 import org.apache.wayang.spark.plugin.SparkBasicPlugin;
 import org.apache.wayang.spark.plugin.SparkConversionPlugin;
 import org.apache.wayang.spark.plugin.SparkGraphPlugin;
+import org.apache.wayang.spark.plugin.SparkMultiPlugin;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Register for relevant components of this module.
@@ -30,6 +34,8 @@ public class Spark {
 
     private final static SparkBasicPlugin PLUGIN = new SparkBasicPlugin();
 
+    private final static Map<String, SparkMultiPlugin> MULTI_PLUGINS = new 
HashMap<>();
+
     private final static SparkGraphPlugin GRAPH_PLUGIN = new 
SparkGraphPlugin();
 
     private final static SparkConversionPlugin CONVERSION_PLUGIN = new 
SparkConversionPlugin();
@@ -43,6 +49,18 @@ public class Spark {
         return PLUGIN;
     }
 
+    /**
+     * Retrieve the {@link SparkMultiPlugin}.
+     *
+     * @return the {@link SparkMultiPlugin}
+     */
+    public static SparkMultiPlugin multiPlugin(String name, String conf_path) {
+
+        SparkMultiPlugin MULTI_PLUGIN = new SparkMultiPlugin(name, conf_path);
+        MULTI_PLUGINS.put(name, MULTI_PLUGIN);
+        return MULTI_PLUGIN;
+    }
+
     /**
      * Retrieve the {@link SparkGraphPlugin}.
      *
@@ -70,4 +88,12 @@ public class Spark {
         return SparkPlatform.getInstance();
     }
 
+    /**
+     * Retrieve the {@link SparkPlatform}.
+     *
+     * @return the {@link SparkPlatform}
+     */
+    public static SparkPlatform platform(String name, String config) {
+        return SparkPlatform.getInstance(name, config);
+    }
 }
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java
index 9f8fca12..33422909 100644
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java
+++ 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java
@@ -36,89 +36,114 @@ import 
org.apache.wayang.spark.operators.SparkTsvFileSource;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 /**
  * {@link ChannelConversion}s used by the {@link JavaPlatform}.
  */
 public class ChannelConversions {
 
-    public static final ChannelConversion UNCACHED_RDD_TO_CACHED_RDD = new 
DefaultChannelConversion(
-            RddChannel.UNCACHED_DESCRIPTOR,
-            RddChannel.CACHED_DESCRIPTOR,
-            () -> new 
SparkCacheOperator<>(DataSetType.createDefault(Void.class))
+    private static Supplier<ChannelConversion> UNCACHED_RDD_TO_CACHED_RDD (){
+        return () -> new DefaultChannelConversion(
+                RddChannel.UNCACHED_DESCRIPTOR,
+                RddChannel.CACHED_DESCRIPTOR,
+                () -> new 
SparkCacheOperator<>(DataSetType.createDefault(Void.class))
+        );
+    }
+
+    private static Supplier<ChannelConversion> COLLECTION_TO_BROADCAST (){
+        return () -> new DefaultChannelConversion(
+                CollectionChannel.DESCRIPTOR,
+                BroadcastChannel.DESCRIPTOR,
+                () -> new 
SparkBroadcastOperator<>(DataSetType.createDefault(Void.class))
+        );}
+
+    private static Supplier<ChannelConversion> COLLECTION_TO_UNCACHED_RDD (){
+        return () -> new DefaultChannelConversion(
+                CollectionChannel.DESCRIPTOR,
+                RddChannel.UNCACHED_DESCRIPTOR,
+                () -> new 
SparkCollectionSource<>(DataSetType.createDefault(Void.class))
+        );}
+
+    private static Supplier<ChannelConversion>  UNCACHED_RDD_TO_COLLECTION (){
+        return () -> new DefaultChannelConversion(
+                RddChannel.UNCACHED_DESCRIPTOR,
+                CollectionChannel.DESCRIPTOR,
+                () -> new 
SparkCollectOperator<>(DataSetType.createDefault(Void.class))
+        );}
+
+    private static Supplier<ChannelConversion> CACHED_RDD_TO_COLLECTION  (){
+        return () -> new DefaultChannelConversion(
+                RddChannel.CACHED_DESCRIPTOR,
+                CollectionChannel.DESCRIPTOR,
+                () -> new 
SparkCollectOperator<>(DataSetType.createDefault(Void.class))
+        );}
+
+    private static Supplier<ChannelConversion>  CACHED_RDD_TO_HDFS_TSV (){
+        return () ->  new DefaultChannelConversion(
+                RddChannel.CACHED_DESCRIPTOR,
+                FileChannel.HDFS_TSV_DESCRIPTOR,
+                () -> new 
SparkTsvFileSink<>(DataSetType.createDefaultUnchecked(Tuple2.class))
+        );}
+
+    private static Supplier<ChannelConversion> UNCACHED_RDD_TO_HDFS_TSV (){
+        return () -> new DefaultChannelConversion(
+                RddChannel.UNCACHED_DESCRIPTOR,
+                FileChannel.HDFS_TSV_DESCRIPTOR,
+                () -> new 
SparkTsvFileSink<>(DataSetType.createDefaultUnchecked(Tuple2.class))
+        );}
+
+    private static Supplier<ChannelConversion> HDFS_TSV_TO_UNCACHED_RDD (){
+        return () -> new DefaultChannelConversion(
+                FileChannel.HDFS_TSV_DESCRIPTOR,
+                RddChannel.UNCACHED_DESCRIPTOR,
+                () -> new 
SparkTsvFileSource(DataSetType.createDefault(Tuple2.class))
+        );}
+
+    private static Supplier<ChannelConversion> CACHED_RDD_TO_HDFS_OBJECT_FILE  
() {
+        return () -> new DefaultChannelConversion(
+                RddChannel.CACHED_DESCRIPTOR,
+                FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR,
+                () -> new 
SparkObjectFileSink<>(DataSetType.createDefault(Void.class))
+        );
+    }
+
+    private static Supplier<ChannelConversion> 
UNCACHED_RDD_TO_HDFS_OBJECT_FILE () {
+        return () -> new DefaultChannelConversion(
+                RddChannel.UNCACHED_DESCRIPTOR,
+                FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR,
+                () -> new 
SparkObjectFileSink<>(DataSetType.createDefault(Void.class))
+        );
+    }
+
+    private static Supplier<ChannelConversion> 
HDFS_OBJECT_FILE_TO_UNCACHED_RDD () {
+        return () -> new DefaultChannelConversion(
+                FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR,
+                RddChannel.UNCACHED_DESCRIPTOR,
+                () -> new 
SparkObjectFileSource<>(DataSetType.createDefault(Void.class))
+        );
+    }
+
+    public static Collection<Supplier<ChannelConversion>> SUPPLIERS = 
Arrays.asList(
+            UNCACHED_RDD_TO_CACHED_RDD(),
+            COLLECTION_TO_BROADCAST(),
+            COLLECTION_TO_UNCACHED_RDD(),
+            UNCACHED_RDD_TO_COLLECTION(),
+            CACHED_RDD_TO_COLLECTION(),
+            CACHED_RDD_TO_HDFS_OBJECT_FILE(),
+            UNCACHED_RDD_TO_HDFS_OBJECT_FILE(),
+            HDFS_OBJECT_FILE_TO_UNCACHED_RDD(),
+//            HDFS_TSV_TO_UNCACHED_RDD(),
+            CACHED_RDD_TO_HDFS_TSV(),
+            UNCACHED_RDD_TO_HDFS_TSV()
     );
 
-    public static final ChannelConversion COLLECTION_TO_BROADCAST = new 
DefaultChannelConversion(
-            CollectionChannel.DESCRIPTOR,
-            BroadcastChannel.DESCRIPTOR,
-            () -> new 
SparkBroadcastOperator<>(DataSetType.createDefault(Void.class))
-    );
-
-    public static final ChannelConversion COLLECTION_TO_UNCACHED_RDD = new 
DefaultChannelConversion(
-            CollectionChannel.DESCRIPTOR,
-            RddChannel.UNCACHED_DESCRIPTOR,
-            () -> new 
SparkCollectionSource<>(DataSetType.createDefault(Void.class))
-    );
-
-    public static final ChannelConversion UNCACHED_RDD_TO_COLLECTION = new 
DefaultChannelConversion(
-            RddChannel.UNCACHED_DESCRIPTOR,
-            CollectionChannel.DESCRIPTOR,
-            () -> new 
SparkCollectOperator<>(DataSetType.createDefault(Void.class))
-    );
-
-    public static final ChannelConversion CACHED_RDD_TO_COLLECTION = new 
DefaultChannelConversion(
-            RddChannel.CACHED_DESCRIPTOR,
-            CollectionChannel.DESCRIPTOR,
-            () -> new 
SparkCollectOperator<>(DataSetType.createDefault(Void.class))
-    );
-
-    public static final ChannelConversion CACHED_RDD_TO_HDFS_TSV = new 
DefaultChannelConversion(
-            RddChannel.CACHED_DESCRIPTOR,
-            FileChannel.HDFS_TSV_DESCRIPTOR,
-            () -> new 
SparkTsvFileSink<>(DataSetType.createDefaultUnchecked(Tuple2.class))
-    );
+    public static  Collection<ChannelConversion> ALL = getALL();
 
-    public static final ChannelConversion UNCACHED_RDD_TO_HDFS_TSV = new 
DefaultChannelConversion(
-            RddChannel.UNCACHED_DESCRIPTOR,
-            FileChannel.HDFS_TSV_DESCRIPTOR,
-            () -> new 
SparkTsvFileSink<>(DataSetType.createDefaultUnchecked(Tuple2.class))
-    );
+    public static Collection<ChannelConversion> getALL(){
+        return 
SUPPLIERS.stream().map(Supplier::get).collect(Collectors.toList());
+    }
 
-    public static final ChannelConversion HDFS_TSV_TO_UNCACHED_RDD = new 
DefaultChannelConversion(
-            FileChannel.HDFS_TSV_DESCRIPTOR,
-            RddChannel.UNCACHED_DESCRIPTOR,
-            () -> new 
SparkTsvFileSource(DataSetType.createDefault(Tuple2.class))
-    );
 
-    public static final ChannelConversion CACHED_RDD_TO_HDFS_OBJECT_FILE = new 
DefaultChannelConversion(
-            RddChannel.CACHED_DESCRIPTOR,
-            FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR,
-            () -> new 
SparkObjectFileSink<>(DataSetType.createDefault(Void.class))
-    );
-
-    public static final ChannelConversion UNCACHED_RDD_TO_HDFS_OBJECT_FILE = 
new DefaultChannelConversion(
-            RddChannel.UNCACHED_DESCRIPTOR,
-            FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR,
-            () -> new 
SparkObjectFileSink<>(DataSetType.createDefault(Void.class))
-    );
-
-    public static final ChannelConversion HDFS_OBJECT_FILE_TO_UNCACHED_RDD = 
new DefaultChannelConversion(
-            FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR,
-            RddChannel.UNCACHED_DESCRIPTOR,
-            () -> new 
SparkObjectFileSource<>(DataSetType.createDefault(Void.class))
-    );
-
-    public static Collection<ChannelConversion> ALL = Arrays.asList(
-            UNCACHED_RDD_TO_CACHED_RDD,
-            COLLECTION_TO_BROADCAST,
-            COLLECTION_TO_UNCACHED_RDD,
-            UNCACHED_RDD_TO_COLLECTION,
-            CACHED_RDD_TO_COLLECTION,
-            CACHED_RDD_TO_HDFS_OBJECT_FILE,
-            UNCACHED_RDD_TO_HDFS_OBJECT_FILE,
-            HDFS_OBJECT_FILE_TO_UNCACHED_RDD,
-//            HDFS_TSV_TO_UNCACHED_RDD,
-            CACHED_RDD_TO_HDFS_TSV,
-            UNCACHED_RDD_TO_HDFS_TSV
-    );
 }
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java
index 046fb280..4e266405 100644
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java
+++ 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java
@@ -21,8 +21,10 @@ package org.apache.wayang.spark.mapping;
 import org.apache.wayang.core.mapping.Mapping;
 import org.apache.wayang.spark.mapping.graph.PageRankMapping;
 
+import java.lang.reflect.InvocationTargetException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.stream.Collectors;
 
 /**
  * Register for {@link Mapping}s for this platform.
@@ -63,4 +65,26 @@ public class Mappings {
             new PageRankMapping()
     );
 
+    public static Collection<Mapping> getBasicMappings(){
+        return BASIC_MAPPINGS.stream()
+                .map(mapping -> {
+                            try {
+                                return 
mapping.getClass().getConstructor().newInstance();
+                            } catch (InstantiationException e) {
+                                e.printStackTrace();
+                            } catch (IllegalAccessException e) {
+                                e.printStackTrace();
+                            } catch (InvocationTargetException e) {
+                                e.printStackTrace();
+                            } catch (NoSuchMethodException e) {
+                                e.printStackTrace();
+                            }
+                            return null;
+                        }
+                )
+                .collect(Collectors.toList());
+    }
+
+
+
 }
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
index 9b574afc..d3c7b791 100644
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
+++ 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
@@ -41,6 +41,8 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -52,12 +54,14 @@ public class SparkPlatform extends Platform {
 
     private static final String CONFIG_NAME = "spark";
 
-    private static final String DEFAULT_CONFIG_FILE = 
"wayang-spark-defaults.properties";
+    private String DEFAULT_CONFIG_FILE = "wayang-spark-defaults.properties";
 
     public static final String INITIALIZATION_MS_CONFIG_KEY = 
"wayang.spark.init.ms";
 
     private static SparkPlatform instance = null;
 
+    private static Map<String, SparkPlatform> instances = new HashMap<>();
+
     private static final String[] REQUIRED_SPARK_PROPERTIES = {
             "spark.master"
     };
@@ -95,16 +99,41 @@ public class SparkPlatform extends Platform {
     private Logger logger = LogManager.getLogger(this.getClass());
 
     public static SparkPlatform getInstance() {
-        if (instance == null) {
-            instance = new SparkPlatform();
+        if(instances.isEmpty()) {
+            instances.put("default", new SparkPlatform());
+//        if (instance == null) {
+//            instance = new SparkPlatform();
+//        }
+            return instances.get("default");
+        } else {
+            String first = instances.keySet().stream().findFirst().get();
+            System.out.println("first");
+            System.out.println(first);
+            return instances.get(first);
+        }
+
+//        return instance;
+    }
+
+    public static SparkPlatform getInstance(String name, String config) {
+        if (!instances.containsKey(name)) {
+            instances.put(name, new SparkPlatform(name, config));
+//        if (instance == null) {
+//            instance = new SparkPlatform(name, config);
+//        }
         }
-        return instance;
+        return instances.get(name);
+//        return instance;
     }
 
     private SparkPlatform() {
         super(PLATFORM_NAME, CONFIG_NAME);
     }
 
+    private SparkPlatform(String name, String config) {
+        super(PLATFORM_NAME + " " + name, name, config);
+    }
+
     /**
      * Configures the single maintained {@link JavaSparkContext} according to 
the {@code job} and returns it.
      *
@@ -169,6 +198,11 @@ public class SparkPlatform extends Platform {
         configuration.load(ReflectionUtils.loadResource(DEFAULT_CONFIG_FILE));
     }
 
+    @Override
+    public void configureCustom(Configuration configuration, String config) {
+        configuration.load(ReflectionUtils.loadResource(config));
+    }
+
     @Override
     public LoadProfileToTimeConverter 
createLoadProfileToTimeConverter(Configuration configuration) {
         int cpuMhz = (int) 
configuration.getLongProperty("wayang.spark.cpu.mhz");
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/plugin/SparkMultiPlugin.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/plugin/SparkMultiPlugin.java
new file mode 100644
index 00000000..180acdba
--- /dev/null
+++ 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/plugin/SparkMultiPlugin.java
@@ -0,0 +1,47 @@
+package org.apache.wayang.spark.plugin;
+
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.optimizer.channels.ChannelConversion;
+import org.apache.wayang.core.platform.Platform;
+import org.apache.wayang.core.plugin.Plugin;
+import org.apache.wayang.java.platform.JavaPlatform;
+import org.apache.wayang.spark.channels.ChannelConversions;
+import org.apache.wayang.spark.mapping.Mappings;
+import org.apache.wayang.spark.platform.SparkPlatform;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+public class SparkMultiPlugin  implements Plugin {
+
+    String name;
+    String config;
+
+    public SparkMultiPlugin(String name, String config){
+        this.name = name;
+        this.config = config;
+    }
+
+    @Override
+    public Collection<Mapping> getMappings() {
+        return Mappings.getBasicMappings();
+//        return Mappings.BASIC_MAPPINGS;
+    }
+
+    @Override
+    public Collection<ChannelConversion> getChannelConversions() {
+
+        return ChannelConversions.ALL;
+    }
+
+    @Override
+    public Collection<Platform> getRequiredPlatforms() {
+        return Arrays.asList(SparkPlatform.getInstance(this.name, 
this.config), JavaPlatform.getInstance());
+    }
+
+    @Override
+    public void setProperties(Configuration configuration) {
+        // Nothing to do, because we already configured the properties in 
#configureDefaults(...).
+    }
+}
diff --git 
a/wayang-platforms/wayang-spark/code/main/resources/wayang-sparky-default.properties
 
b/wayang-platforms/wayang-spark/code/main/resources/wayang-sparky-default.properties
new file mode 100644
index 00000000..47fcdbd0
--- /dev/null
+++ 
b/wayang-platforms/wayang-spark/code/main/resources/wayang-sparky-default.properties
@@ -0,0 +1,568 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+spark.master = local[*]
+spark.app.name = Wayang App Spark
+spark.ui.showConsoleProgress = false
+spark.driver.allowMultipleContexts=true
+# spark.driver.memory = 1g
+
+wayang.spark.cpu.mhz = 2700
+wayang.spark.machines = 1
+wayang.spark.cores-per-machine = 2
+wayang.spark.hdfs.ms-per-mb = 2.7
+wayang.spark.network.ms-per-mb = 8.6
+wayang.spark.init.ms = 4500
+wayang.spark.stretch = 1
+wayang.spark.costs.fix = 0.0
+wayang.spark.costs.per-ms = 1.0
+
+
+wayang.spark.map.load.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.map.load = {\
+  "in":1, "out":1,\
+  "cpu":"${700*in0 + 56789}",\
+  "ram":"10000",\
+  "disk":"0",\
+  "net":"${0.2*out0 + 2000}",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.zipwithid.load.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.zipwithid.load = {\
+  "in":1, "out":1,\
+  "cpu":"${1000*in0 + 56789}",\
+  "ram":"10000",\
+  "disk":"0",\
+  "net":"${0.2*out0 + 2000}",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.mappartitions.load.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.mappartitions.load = {\
+  "in":1, "out":1,\
+  "cpu":"${600*in0 + 600*out0 + 56789}",\
+  "ram":"10000",\
+  "disk":"0",\
+  "net":"${0.2*out0 + 2000}",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.filter.load.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.filter.load = {\
+  "in":1, "out":1,\
+  "cpu":"${500*in0 + 56789}",\
+  "ram":"10000",\
+  "disk":"0",\
+  "net":"0",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.flatmap.load.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.flatmap.load = {\
+  "in":1, "out":1,\
+  "cpu":"${600*in0 + 600*out0 + 56789}",\
+  "ram":"10000",\
+  "disk":"0",\
+  "net":"${0.2 * in0 + 2000}",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+
+wayang.spark.bernoulli-sample.load.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.bernoulli-sample.load = {\
+  "in":1, "out":1,\
+  "cpu":"${700*in0 + 500000000}",\
+  "ram":"10000",\
+  "disk":"0",\
+  "net":"0",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.random-partition-sample.load.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.random-partition-sample.load = {\
+  "in":1, "out":1,\
+  "cpu":"${700*in0 + 500000000}",\
+  "ram":"10000",\
+  "disk":"0",\
+  "net":"0",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.shuffle-partition-sample.load.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.shuffle-partition-sample.load = {\
+  "in":1, "out":1,\
+  "cpu":"${699*in0 + 500000000}",\
+  "ram":"10000",\
+  "disk":"0",\
+  "net":"0",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.reduceby.load.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.reduceby.load = {\
+  "in":1, "out":1,\
+  "cpu":"${1700*in0 + 56789}",\
+  "ram":"10000",\
+  "disk":"${in0}",\
+  "net":"${0.3*in0 + 43000}",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.groupby.load.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.groupby.load = {\
+  "in":1, "out":1,\
+  "cpu":"${17000*in0 + 56789}",\
+  "ram":"10000",\
+  "disk":"${in0}",\
+  "net":"${0.3*in0 + 430000}",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.sort.load.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.sort.load = {\
+  "in":1, "out":1,\
+  "cpu":"${1700*in0 + 56789}",\
+  "ram":"10000",\
+  "disk":"${in0}",\
+  "net":"${0.3*in0 + 430000}",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.globalreduce.load.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.globalreduce.load = {\
+  "in":1, "out":1,\
+  "cpu":"${300*in0 + 56789}",\
+  "ram":"0",\
+  "disk":"0",\
+  "net":"200000",\
+  "p":0.9,\
+  "overhead":5,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.globalgroup.load.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.globalgroup.load = {\
+  "in":1, "out":1,\
+  "cpu":"${400*in0 + 56789}",\
+  "ram":"0",\
+  "disk":"0",\
+  "net":"200000",\
+  "p":0.9,\
+  "overhead":5,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.count.load.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.count.load = {\
+  "in":1, "out":1,\
+  "cpu":"${1000*in0 + 56789}",\
+  "ram":"0",\
+  "disk":"0",\
+  "net":"125000",\
+  "p":0.9,\
+  "overhead":5,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.distinct.load.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.distinct.load = {\
+  "in":1, "out":1,\
+  "cpu":"${1700*in0 + 56789}",\
+  "ram":"0",\
+  "disk":"0",\
+  "net":"${4*in0 + 430000}",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.join.load.template = {\
+  "type":"mathex", "in":2, "out":1,\
+  "cpu":"?*(in0 + in1) + ?*out0 + ?"\
+}
+wayang.spark.join.load = {\
+  "in":2, "out":1,\
+  "cpu":"${1700 * (in0 + in1 + out0) + 56789}",\
+  "ram":"0",\
+  "disk":"${20 * in0}",\
+  "net":"${20 * (in0 + in1 + out0) + 430000}",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0 + in1)}"\
+}
+
+wayang.spark.cogroup.load.template = {\
+  "type":"mathex", "in":2, "out":1,\
+  "cpu":"?*(in0 + in1) + ?*out0 + ?"\
+}
+wayang.spark.cogroup.load = {\
+  "in":2, "out":1,\
+  "cpu":"${1700 * (in0 + in1 + out0) + 56789}",\
+  "ram":"0",\
+  "disk":"${20 * in0}",\
+  "net":"${20 * (in0 + in1 + out0) + 430000}",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0 + in1)}"\
+}
+
+wayang.spark.intersect.load.template = {\
+  "type":"mathex", "in":2, "out":1,\
+  "cpu":"?*(in0 + in1) + ?*out0 + ?"\
+}
+wayang.spark.intersect.load = {\
+  "in":2, "out":1,\
+  "cpu":"${1300 * (in0 + in1 + out0) + 56789}",\
+  "ram":"0",\
+  "disk":"${20 * in0}",\
+  "net":"${20 * (in0 + in1 + out0) + 430000}",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0 + in1)}"\
+}
+
+wayang.spark.cartesian.load.template = {\
+  "type":"mathex", "in":2, "out":1,\
+  "cpu":"?*(in0 + in1) + ?*out0 + ?"\
+}
+wayang.spark.cartesian.load = {\
+  "in":2, "out":1,\
+  "cpu":"${20000000*in0 + 10000000*in1 + 100*out0 + 56789}",\
+  "ram":"0",\
+  "disk":"0",\
+  "net":"${20000*(in0 + in1) + 1700000}",\
+  "p":0.9,\
+  "overhead":1000,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0 + in1)}"\
+}
+
+wayang.spark.union.load.template = {\
+  "type":"mathex", "in":2, "out":1,\
+  "cpu":"?"\
+}
+wayang.spark.union.load = {\
+  "in":2, "out":1,\
+  "cpu":"56789",\
+  "ram":"0",\
+  "disk":"0",\
+  "net":"0",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0 + in1)}"\
+}
+
+wayang.spark.broadcast.load.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.broadcast.load = {\
+  "in":1, "out":1,\
+  "cpu":"${5500*out0 + 56789}",\
+  "ram":"${100*out0 + 12000}",\
+  "disk":"0",\
+  "net":"${9.5*in0 + 45000}",\
+  "p":0.9,\
+  "overhead":5,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.cache.load.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.cache.load = {\
+  "in":1, "out":1,\
+  "cpu":"${4000*out0 + 56789}",\
+  "ram":"10000",\
+  "disk":"0",\
+  "net":"${4.5*in0 + 43000}",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.loop.load.template = {\
+  "type":"mathex", "in":4, "out":3,\
+  "cpu":"?*in3 + ?"\
+}
+wayang.spark.loop.load = {\
+  "in":4, "out":3,\
+  "cpu":"${5000*in3 + 56789}",\
+  "ram":"10000",\
+  "disk":"0",\
+  "net":"${4.5*in0 + 43000}",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0 + in1 + in2 + in3)}"\
+}
+
+wayang.spark.while.load.template = {\
+  "type":"mathex", "in":3, "out":2,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.while.load = {\
+  "in":3, "out":2,\
+  "cpu":"${4000*in2 + 56789}",\
+  "ram":"10000",\
+  "disk":"0",\
+  "net":"${4.5*in0 + 43000}",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0 + in1 + in2)}"\
+}
+
+
+wayang.spark.repeat.load.template = {\
+  "type":"mathex", "in":2, "out":2,\
+  "cpu":"?"\
+}
+wayang.spark.repeat.load = {\
+  "in":2, "out":2,\
+  "cpu":"${810000}",\
+  "ram":"10000",\
+  "p":0.9\
+}
+
+wayang.spark.collectionsource.load.template = {\
+  "type":"mathex", "in":0, "out":1,\
+  "cpu":"?*out0 + ?"\
+}
+wayang.spark.collectionsource.load = {\
+  "in":0, "out":1,\
+  "cpu":"${400*out0 + 56789}",\
+  "ram":"${100*out0 + 2000}",\
+  "disk":"0",\
+  "net":"0",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, out0)}"\
+}
+
+wayang.spark.collect.load.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*out0 + ?"\
+}
+wayang.spark.collect.load = {\
+  "in":1, "out":1,\
+  "cpu":"${100*in0 + 56789}",\
+  "ram":"10000",\
+  "disk":"0",\
+  "net":"${4.5*in0 + 43000}",\
+  "p":0.9,\
+  "overhead":5,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.localcallbacksink.load.template = {\
+  "type":"mathex", "in":1, "out":0,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.localcallbacksink.load = {\
+  "in":1, "out":0,\
+  "cpu":"${4000*in0 + 56789}",\
+  "ram":"10000",\
+  "disk":"0",\
+  "net":"${4.5*in0 + 43000}",\
+  "p":0.9,\
+  "overhead":5,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.textfilesource.load.prepare.template = {\
+  "type":"mathex", "in":0, "out":1,\
+  "cpu":"?"\
+}
+wayang.spark.textfilesource.load.prepare = {\
+  "in":0, "out":1,\
+  "cpu":"${50056789}",\
+  "ram":"${10}",\
+  "disk":"${0}",\
+  "net":"${0}",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, out0)}"\
+}
+wayang.spark.textfilesource.load.main.template = {\
+  "type":"mathex", "in":0, "out":1,\
+  "cpu":"?*out0"\
+}
+wayang.spark.textfilesource.load.main = {\
+  "in":0, "out":1,\
+  "cpu":"${500*out0}",\
+  "ram":"${10}",\
+  "disk":"${0}",\
+  "net":"${0}",\
+  "p":0.9,\
+  "overhead":0,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, out0)}"\
+}
+
+
+wayang.spark.objectfilesource.load.template = {\
+  "type":"mathex", "in":0, "out":1,\
+  "cpu":"?*out0 + ?"\
+}
+wayang.spark.objectfilesource.load = {\
+  "in":0, "out":1,\
+  "cpu":"${700*out0 + 56789}",\
+  "ram":"${10*out0}",\
+  "disk":"${out0/10}",\
+  "net":"${out0 * 10 + 5000000}",\
+  "p":0.9,\
+  "overhead":10,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, out0)}"\
+}
+
+wayang.spark.objectfilesink.load.template = {\
+  "type":"mathex", "in":1, "out":0,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.objectfilesink.load = {\
+  "in":1, "out":0,\
+  "cpu":"${500*in0 + 56789}",\
+  "ram":"${10*in0}",\
+  "disk":"${in0/10}",\
+  "net":"${in0 * 10 + 5000000}",\
+  "p":0.9,\
+  "overhead":10,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.tsvfilesource.load.template = {\
+  "type":"mathex", "in":0, "out":1,\
+  "cpu":"?*out0 + ?"\
+}
+wayang.spark.tsvfilesource.load = {\
+  "in":0, "out":1,\
+  "cpu":"${700*out0 + 56789}",\
+  "ram":"${10*out0}",\
+  "disk":"${out0/10}",\
+  "net":"${out0 * 10 + 5000000}",\
+  "p":0.9,\
+  "overhead":10,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, out0)}"\
+}
+
+wayang.spark.tsvfilesink.load.template = {\
+  "type":"mathex", "in":1, "out":0,\
+  "cpu":"?*in0 + ?"\
+}
+wayang.spark.tsvfilesink.load = {\
+  "in":1, "out":0,\
+  "cpu":"${500*in0 + 56789}",\
+  "ram":"${10*in0}",\
+  "disk":"${in0/10}",\
+  "net":"${in0 * 10 + 5000000}",\
+  "p":0.9,\
+  "overhead":10,\
+  "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\
+}
+
+wayang.spark.pagerank.load.main.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*in0 + ?*in0*numIterations + ?"\
+}
+wayang.spark.pagerank.load.main = {\
+ "in":1,\
+ "out":1,\
+ "cpu":"${5000*in0 + 2500*out0 + 1E8}",\
+ "ram":"0",\
+ "disk":"0",\
+ "net":"0",\
+ "p":0.9\
+}
+wayang.spark.pagerank.load.output.template = {\
+  "type":"mathex", "in":1, "out":1,\
+  "cpu":"?*out0"\
+}
+wayang.spark.pagerank.load.output = {\
+ "in":1,\
+ "out":1,\
+ "cpu":"0",\
+ "ram":"0",\
+ "disk":"0",\
+ "net":"0",\
+ "p":0.9\
+}
diff --git 
a/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/test/DoubleSpark.java
 
b/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/test/DoubleSpark.java
new file mode 100644
index 00000000..cf39e24c
--- /dev/null
+++ 
b/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/test/DoubleSpark.java
@@ -0,0 +1,50 @@
+package org.apache.wayang.spark.test;
+
+import org.apache.wayang.basic.operators.FilterOperator;
+import org.apache.wayang.basic.operators.LocalCallbackSink;
+import org.apache.wayang.basic.operators.TextFileSource;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.plan.wayangplan.WayangPlan;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.ReflectionUtils;
+import org.apache.wayang.java.platform.JavaPlatform;
+import org.apache.wayang.spark.Spark;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class DoubleSpark {
+    public static void main(String[] args) {
+        List<String> collector = new LinkedList<>();
+        TextFileSource textFileSource = new 
TextFileSource("file:///Users/rodrigopardomeza/files/demo.txt");
+        textFileSource.setName("Load file");
+
+        FilterOperator<String> filterOperator = new FilterOperator<>(str -> 
!str.isEmpty(), String.class);
+        filterOperator.setName("Filter empty words");
+        filterOperator.addTargetPlatform(Spark.platform("sparky", 
"wayang-sparky-default.properties"));
+
+        // write results to a sink
+        LocalCallbackSink<String> sink = 
LocalCallbackSink.createCollectingSink(
+                collector,
+                DataSetType.createDefaultUnchecked(String.class)
+        );
+        sink.setName("Collect result");
+
+        // Build Rheem plan by connecting operators
+        textFileSource.connectTo(0, filterOperator, 0);
+        filterOperator.connectTo(0, sink, 0);
+
+        WayangContext wayangContext = new WayangContext();
+        //wayangContext.register(Java.basicPlugin());
+        wayangContext.register(Spark.multiPlugin("sparky", 
"wayang-sparky-default.properties"));
+//        wayangContext.register(Spark.multiPlugin("other", 
"wayang-spark-defaults.properties"));
+//        wayangContext.register(Spark.basicPlugin());
+
+        System.out.println("here");
+        wayangContext.execute(new WayangPlan(sink), 
ReflectionUtils.getDeclaringJar(JavaPlatform.class));
+
+//        collector.sort((t1 , t2) -> Integer.compare(t2.field1, t1.field1));
+        System.out.printf("Found %d words:\n", collector.size());
+        collector.forEach(wc -> System.out.printf("%s\n", wc));
+    }
+}

Reply via email to