This is an automated email from the ASF dual-hosted git repository.

glauesppen pushed a commit to branch revert-311-MonitoringHackIT
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit a7cd3828e024b9e199bab2cca92d1f6e88114518
Author: Glaucia Esppenchutz <[email protected]>
AuthorDate: Sun Jun 25 11:11:58 2023 +0100

    Revert "Updates for monitoring the runtime metrics of underlying platform 
(Spark)"
---
 .../main/java/org/apache/wayang/core/api/Job.java  |  22 +-
 .../org/apache/wayang/core/api/WayangContext.java  |  29 --
 .../spark/monitoring/interfaces/Application.java   |  99 ----
 .../spark/monitoring/interfaces/Executor.java      | 140 ------
 .../wayang/spark/monitoring/interfaces/Job.java    | 102 ----
 .../monitoring/interfaces/SerializableObject.java  |  39 --
 .../wayang/spark/monitoring/interfaces/Stage.java  | 177 -------
 .../wayang/spark/monitoring/interfaces/Task.java   | 253 ----------
 .../spark/monitoring/interfaces/TaskMetric.java    | 175 -------
 .../spark/monitoring/metrics/ApplicationEnd.java   | 103 ----
 .../spark/monitoring/metrics/ApplicationStart.java |  96 ----
 .../spark/monitoring/metrics/ExecutorAdded.java    | 131 ------
 .../spark/monitoring/metrics/ExecutorRemoved.java  | 128 -----
 .../spark/monitoring/metrics/ExecutorUpdated.java  | 138 ------
 .../wayang/spark/monitoring/metrics/JobEnd.java    |  89 ----
 .../wayang/spark/monitoring/metrics/JobStart.java  |  88 ----
 .../spark/monitoring/metrics/SparkListener.java    | 524 ---------------------
 .../spark/monitoring/metrics/StageCompleted.java   | 182 -------
 .../monitoring/metrics/StageExecutorMetrics.java   | 165 -------
 .../spark/monitoring/metrics/StageSubmitted.java   | 163 -------
 .../wayang/spark/monitoring/metrics/TaskEnd.java   | 220 ---------
 .../monitoring/metrics/TaskGettingResult.java      | 218 ---------
 .../spark/monitoring/metrics/TaskMetric.java       | 158 -------
 .../wayang/spark/monitoring/metrics/TaskStart.java | 217 ---------
 .../wayang/spark/platform/SparkPlatform.java       |  31 +-
 .../main/resources/wayang-spark-kafka.properties   |  42 --
 wayang-platforms/wayang-spark/pom.xml              |  47 +-
 27 files changed, 44 insertions(+), 3732 deletions(-)

diff --git 
a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Job.java 
b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Job.java
index ce20b2df..97454b3a 100644
--- 
a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Job.java
+++ 
b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Job.java
@@ -109,7 +109,6 @@ public class Job extends OneTimeExecutable {
      */
     private final WayangPlan wayangPlan;
 
-
     /**
      * {@link OptimizationContext} for the {@link #wayangPlan}.
      */
@@ -162,12 +161,9 @@ public class Job extends OneTimeExecutable {
 
     private Monitor monitor;
 
-
-
     /**
      * Name for this instance.
      */
-    private boolean montiorWithHackIT;
     private final String name;
 
     /**
@@ -203,8 +199,7 @@ public class Job extends OneTimeExecutable {
         for (String udfJar : udfJars) {
             this.addUdfJar(udfJar);
         }
-        // set HackIT debugger enable or disable
-        this.setMontiorWithHackIT(wayangContext.isWithHackITMonitioring());
+
         // Prepare re-optimization.
         if 
(this.configuration.getBooleanProperty("wayang.core.optimizer.reoptimize")) {
             this.cardinalityBreakpoint = new 
CardinalityBreakpoint(this.configuration);
@@ -617,21 +612,6 @@ public class Job extends OneTimeExecutable {
         return true;
     }
 
-    /**
-     *  getter method
-     * @return boolean either enable or disable HACKIT for WAYANG Job
-     */
-    public boolean isMontiorWithHackIT() {
-        return montiorWithHackIT;
-    }
-
-    /**
-     * setter method
-     * @param montiorWithHackIT boolean
-     */
-    public void setMontiorWithHackIT(boolean montiorWithHackIT) {
-        this.montiorWithHackIT = montiorWithHackIT;
-    }
     /**
      * Enumerate possible execution plans from the given {@link WayangPlan} 
and determine the (seemingly) best one.
      */
diff --git 
a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/WayangContext.java
 
b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/WayangContext.java
index 4d660b78..f40f5369 100644
--- 
a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/WayangContext.java
+++ 
b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/WayangContext.java
@@ -45,11 +45,6 @@ public class WayangContext {
 
     private final Configuration configuration;
 
-
-    /**
-     * For enabling montioring for debugging using HACKIT!
-     */
-    private boolean withHackITMonitioring;
     public WayangContext() {
         this(new Configuration());
     }
@@ -69,14 +64,6 @@ public class WayangContext {
         return this;
     }
 
-    /**
-     * enable the monitoring for platform tasks
-     * @return WayangContext
-     */
-    public WayangContext withMontioringForHackIT(){
-        this.setWithHackITMonitioring(true);
-        return this;
-    }
     /**
      * Registers the given {@link Plugin} with this instance.
      *
@@ -196,20 +183,4 @@ public class WayangContext {
         }
         return this.cardinalityRepository;
     }
-
-    /**
-     * getter method
-     * @return boolean that depicts HACKIT enable
-     */
-    public boolean isWithHackITMonitioring() {
-        return withHackITMonitioring;
-    }
-
-    /**
-     * setter method
-     * @param withHackITMonitioring is a boolean for HACKIT
-     */
-    public void setWithHackITMonitioring(boolean withHackITMonitioring) {
-        this.withHackITMonitioring = withHackITMonitioring;
-    }
 }
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Application.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Application.java
deleted file mode 100644
index 2d7277b5..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Application.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.interfaces;
-import java.io.Serializable;
-import java.util.List;
-/**
- * This interface represents an application metrics in a Spark cluster.
- *  The implementing classes must be serializable.
- */
-public interface Application extends Serializable {
-    /**
-     * Sets the name of the event associated with the application.
-     *
-     * @param name the name of the event
-     */
-    void setEventame(String name);
-    /**
-     * Returns the name of the event associated with the application.
-     *
-     * @return the name of the event
-     */
-    String getEventName();
-    /**
-     * Sets the name of the application.
-     *
-     * @param name the name of the application
-     */
-    void setName(String name);
-    /**
-     * Returns the name of the application.
-     *
-     * @return the name of the application
-     */
-    String getName();
-    /**
-     * Sets the start time of the application.
-     *
-     * @param time the start time of the application
-     */
-    void setStartTime(long time);
-    /**
-     * Returns the start time of the application.
-     *
-     * @return the start time of the application
-     */
-    long getTime();
-    /**
-     * Sets the ID of the application.
-     *
-     * @param id the ID of the application
-     */
-    void setAppID(String id);
-    /**
-     * Returns the ID of the application.
-     *
-     * @return the ID of the application
-     */
-    String getAppID();
-    /**
-     * Sets the user associated with the Spark application.
-     *
-     * @param user the user associated with the Spark application
-     */
-    void setSparkUser(String user);
-    /**
-     * Returns the user associated with the Spark application.
-     *
-     * @return the user associated with the Spark application
-     */
-    String getSparkUser();
-    /**
-     * Sets the list of jobs associated with the application.
-     *
-     * @param listOfJobs the list of jobs associated with the application
-     */
-    void setListOfJobs(List<Job> listOfJobs);
-    /**
-     * Returns the list of jobs associated with the application.
-     *
-     * @return the list of jobs associated with the application
-     */
-    List<Job> getListOfjobs();
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Executor.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Executor.java
deleted file mode 100644
index f748afba..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Executor.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.interfaces;
-
-import java.io.Serializable;
-/**
- * The Executor interface represents an executor in a Spark cluster. It 
defines methods for setting and getting various attributes of an executor.
- * These attributes include the name of the event, the ID of the stage the 
executor is running, the ID of the executor itself,
- * the attempt ID of the stage, the time at which the executor started, the 
host where the executor is running,
- * the total number of cores available to the executor, the resource 
information of the executor,
- * and the reason for which the executor was removed from the cluster.
- */
-
-public interface Executor extends Serializable {
-    /**
-     * Sets the name of the event associated with this executor.
-     * @param name The name of the event.
-     */
-    void setEventame(String name);
-
-    /**
-     * Returns the name of the event associated with this executor.
-     * @return The name of the event.
-     */
-    String getEventName();
-
-    /**
-     * Sets the ID of the stage the executor is running.
-     * @param id The ID of the stage.
-     */
-    void setStageID(int id);
-
-    /**
-     * Returns the ID of the stage the executor is running.
-     * @return The ID of the stage.
-     */
-    int getStageID();
-
-    /**
-     * Sets the ID of this executor.
-     * @param id The ID of the executor.
-     */
-    void setExecutorID(String id);
-
-    /**
-     * Returns the ID of this executor.
-     * @return The ID of the executor.
-     */
-    String getExecutorID();
-
-    /**
-     * Sets the attempt ID of the stage.
-     * @param id The attempt ID of the stage.
-     */
-    void stageAttempt(int id);
-
-    /**
-     * Returns the attempt ID of the stage.
-     * @return The attempt ID of the stage.
-     */
-    int getStageAttempt();
-
-    /**
-     * Sets the time at which this executor started.
-     * @param Time The start time of the executor.
-     */
-    void executorTime(long Time);
-
-    /**
-     * Returns the time at which this executor started.
-     * @return The start time of the executor.
-     */
-    long getExecutorTime();
-
-    /**
-     * Sets the host where this executor is running.
-     * @param host The host where the executor is running.
-     */
-    void setExecutorHost(String host);
-
-    /**
-     * Returns the host where this executor is running.
-     * @return The host where the executor is running.
-     */
-    String getExecutorHost();
-
-    /**
-     * Sets the total number of cores available to this executor.
-     * @param cores The total number of cores.
-     */
-    void setTotalCores(int cores);
-
-    /**
-     * Returns the total number of cores available to this executor.
-     * @return The total number of cores.
-     */
-    int getTotalCores();
-
-    /**
-     * Sets the resource information of this executor.
-     * @param resourceInfoId The resource information of the executor.
-     */
-    void setResourceInfo(int resourceInfoId);
-
-    /**
-     * Returns the resource information of this executor.
-     * @return The resource information of the executor.
-     */
-    int getResourceInfo();
-
-    /**
-     * Sets the reason for which this executor was removed from the cluster.
-     * @param reasonOfRemoval The reason for removal.
-     */
-    void setReasonOfRemoval(String reasonOfRemoval);
-
-    /**
-     * Returns the reason for which this executor was removed from the cluster.
-     * @return The reason for removal.
-     */
-    String getReasonOfRemoval();
-
-
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Job.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Job.java
deleted file mode 100644
index 9ccebf1f..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Job.java
+++ /dev/null
@@ -1,102 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.interfaces;
-
-import scala.collection.Seq;
-
-import java.io.Serializable;
-import java.util.List;
-/**
- * The Job interface represents a job to be executed in a distributed system.
- * A job comprises one or more stages, and contains metadata about the job
- * such as its ID, product arity, and event name.
- */
-public interface Job extends Serializable {
-
-    /**
-     * Sets the name of the event associated with this job.
-     *
-     * @param name the name of the event
-     */
-    void setEventame(String name);
-
-    /**
-     * Returns the name of the event associated with this job.
-     *
-     * @return the name of the event
-     */
-    String getEventName();
-
-    /**
-     * Sets the unique identifier for this job.
-     *
-     * @param jobID the unique identifier for this job
-     */
-    void setJobID(int jobID);
-
-    /**
-     * Returns the unique identifier for this job.
-     *
-     * @return the unique identifier for this job
-     */
-    int getJobID();
-
-    /**
-     * Sets the number of output products produced by this job.
-     *
-     * @param productArity the number of output products produced by this job
-     */
-    void setProductArity(int productArity);
-
-    /**
-     * Returns the number of output products produced by this job.
-     *
-     * @return the number of output products produced by this job
-     */
-    int getProductArity();
-
-    /**
-     * Sets the stage ID associated with this job.
-     *
-     * @param stageId the stage ID associated with this job
-     */
-    void setStageID(Seq<Object> stageId);
-
-    /**
-     * Returns the stage ID associated with this job.
-     *
-     * @return the stage ID associated with this job
-     */
-    Seq<Object> getStageID();
-
-    /**
-     * Sets the list of stages comprising this job.
-     *
-     * @param listOfStages the list of stages comprising this job
-     */
-    void setListOfStages(List<Stage> listOfStages);
-
-    /**
-     * Returns the list of stages comprising this job.
-     *
-     * @return the list of stages comprising this job
-     */
-    List<Stage> getListOfStages();
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/SerializableObject.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/SerializableObject.java
deleted file mode 100644
index 2ef8bcbb..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/SerializableObject.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.interfaces;
-
-import java.io.Serializable;
-/**
- * The {@code SerializableObject} interface is a marker interface that 
indicates
- * that its implementing classes are serializable. By extending the {@link 
Serializable}
- * interface, classes that implement this interface can be serialized and 
deserialized
- * to and from a stream of bytes.
- * <p>
- * It is recommended that classes implementing this interface provide a 
serialVersionUID
- * field to ensure compatibility between serialized objects of different 
versions.
- * </p>
- * <p>
- * This interface does not define any methods, but instead serves as a tag to 
indicate
- * that implementing classes can be serialized.
- * </p>
- *
- * @see Serializable
- */
-public interface SerializableObject extends Serializable {
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Stage.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Stage.java
deleted file mode 100644
index 02708194..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Stage.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.interfaces;
-
-import org.apache.wayang.spark.monitoring.metrics.TaskMetric;
-
-import java.io.Serializable;
-import java.util.List;
-/**
- * This interface represents a stage in a data processing pipeline.
- * A stage is a set of tasks that are executed together as part of a larger 
computation.
- */
-public interface Stage extends Serializable {
-
-    /**
-     * Sets the name of the event associated with this stage.
-     * @param name the name of the event
-     */
-    void setEventame(String name);
-
-    /**
-     * Gets the name of the event associated with this stage.
-     * @return the name of the event
-     */
-    String getEventName();
-
-    /**
-     * Sets the ID of this stage.
-     * @param ID the ID of the stage
-     */
-    void setID(int ID);
-
-    /**
-     * Gets the ID of this stage.
-     * @return the ID of the stage
-     */
-    int getID();
-
-    /**
-     * Sets the number of tasks associated with this stage.
-     * @param tasks the number of tasks
-     */
-    void setNumberOfTasks(int tasks);
-
-    /**
-     * Gets the number of tasks associated with this stage.
-     * @return the number of tasks
-     */
-    int getNumberOfTasks();
-
-    /**
-     * Sets the name of this stage.
-     * @param name the name of the stage
-     */
-    void setStageName(String name);
-
-    /**
-     * Gets the name of this stage.
-     * @return the name of the stage
-     */
-    String getStageName();
-
-    /**
-     * Sets the status of this stage.
-     * @param status the status of the stage
-     */
-    void setStatus(String status);
-
-    /**
-     * Gets the status of this stage.
-     * @return the status of the stage
-     */
-    String getStatus();
-
-    /**
-     * Sets the details of this stage.
-     * @param details the details of the stage
-     */
-    void setDetails(String details);
-
-    /**
-     * Gets the details of this stage.
-     * @return the details of the stage
-     */
-    String getDetails();
-
-    /**
-     * Sets the submission time of this stage.
-     * @param time the submission time
-     */
-    void setSubmissionTime(long time);
-
-    /**
-     * Gets the submission time of this stage.
-     * @return the submission time
-     */
-    long getSubmissionTime();
-
-    /**
-     * Sets the completion time of this stage.
-     * @param time the completion time
-     */
-    void setCompletionTime(long time);
-
-    /**
-     * Gets the completion time of this stage.
-     * @return the completion time
-     */
-    long getCompletionTime();
-
-    /**
-     * Gets the task metric associated with this stage.
-     * @return the task metric
-     */
-    TaskMetric getTaskMetric();
-
-    /**
-     * Sets the task metric associated with this stage.
-     * @param taskMetric the task metric
-     */
-    void setTaskMetric(TaskMetric taskMetric);
-
-    /**
-     * Sets the ID of the executor associated with this stage.
-     * @param ID the executor ID
-     */
-    void setExecutorID(String ID);
-
-    /**
-     * Gets the ID of the executor associated with this stage.
-     * @return the executor ID
-     */
-    String getExecutorID();
-
-    /**
-     * Sets the ID of the stage attempt.
-     * @param id the stage attempt ID
-     */
-    void setStageAttemptId(int id);
-
-    /**
-     * Gets the ID of the stage attempt.
-     * @return the stage attempt ID
-     */
-    int getStageAttemptId();
-    /**
-     * Sets the list of tasks to be performed.
-     *
-     * @param tasks a List of Task objects representing the tasks to be 
performed
-     */
-    void setListOfTasks(List<Task> tasks);
-    /**
-     * Retrieves the list of tasks to be performed.
-     *
-     * @return a List of Task objects representing the tasks to be performed
-     */
-    List<Task> getListOfTasks();
-
-
-
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Task.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Task.java
deleted file mode 100644
index 6aa3adbd..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/Task.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.interfaces;
-/**
- * The Task interface represents a task in a distributed computing or data 
processing system.
- *
- * <p>This interface extends the SerializableObject interface and defines the 
methods that should
- * be implemented by a task in order to properly function in such a system.
- *
- * <p>The Task interface provides methods for setting and getting various 
properties of a task,
- * including its ID, status, and execution metrics.
- *
- * <p>The Task interface also includes an enum for representing the status of 
a task while it is running.
- *
- * @author [Adeel Aslam]
- */
-public interface Task extends SerializableObject{
-    /**
-     * The TaskStatusForRunning enum represents the possible statuses of a 
task while it is running.
-     *
-     * <p>Each enum value corresponds to a specific status: FAILED, SUCCESS, 
KILLED, SUCCESSFUL,
-     * RUNNING, FINISHED, or SPECULATIVE.
-     *
-     * <p>The FAILED status indicates that the task has failed and will not be 
able to complete
-     * successfully. The SUCCESS status indicates that the task has completed 
successfully and
-     * produced a result. The KILLED status indicates that the task was killed 
before it could
-     * complete. The SUCCESSFUL status indicates that the task has completed 
successfully, but
-     * did not produce a result. The RUNNING status indicates that the task is 
currently running.
-     * The FINISHED status indicates that the task has finished running, but 
its result has not
-     * yet been obtained. The SPECULATIVE status indicates that the task is 
running in a speculative
-     * manner, in addition to the primary task.
-     *
-     * @author [Adeel Aslam]
-     */
-    public enum TaskStatusForRunning {
-        FAILED, SUCCESS, KILLED, SUCCESSFUL,RUNNING,FINISHED, SPECULATIVE;
-    }
-    /**
-     * Sets the name of the event associated with this task.
-     *
-     * @param name the name of the event
-     */
-    void setEventame(String name);
-    /**
-     * Gets the name of the event associated with this task.
-     *
-     * @return the name of the event
-     */
-    String getEventName();
-    /**
-     * Sets the ID of this task.
-     *
-     * @param id the task ID
-     */
-    void setID(String id);
-    /**
-     * Gets the ID of this task.
-     *
-     * @return the task ID
-     */
-    String getID();
-    /**
-     * Sets the IP address of the host machine executing this task.
-     *
-     * @param Ip the IP address of the host machine
-     */
-    void setHostIP(String Ip);
-    /**
-     * Gets the IP address of the host machine executing this task.
-     *
-     * @return the IP address of the host machine
-     */
-    String getHostIP();
-    /**
-     * Sets the ID of this task.
-     *
-     * @param taskId the ID of this task
-     */
-    void setTaskID(long taskId);
-    /**
-     * Sets the ID of the stage to which this task belongs.
-     *
-     * @param id the ID of the stage to which this task belongs
-     */
-    void setStageID(int id);
-    /**
-     * Returns the ID of the stage to which this task belongs.
-     *
-     * @return the ID of the stage to which this task belongs
-     */
-    int getStageID();
-    /**
-     * Returns the ID of this task.
-     *
-     * @return the ID of this task
-     */
-    long getTaskID();
-    /**
-     * Sets the ID of the executor assigned to this task.
-     *
-     * @param executorID the ID of the executor assigned to this task
-     */
-    void setStringExecutorID(String executorID);
-    /**
-     * Returns the ID of the executor assigned to this task.
-     *
-     * @return the ID of the executor assigned to this task
-     */
-    String getExecutorID();
-    /**
-     * Sets the status of this task.
-     *
-     * @param status the status of this task
-     */
-    void setTaskStatus(String status);
-    /**
-     * Returns the status of this task.
-     *
-     * @return the status of this task
-     */
-    String getTaskStatus();
-    /**
-     * Sets the index of this task.
-     *
-     * @param index the index of this task
-     */
-    void setIndex(int index);
-    /**
-     * Returns the index of this task.
-     *
-     * @return the index of this task
-     */
-    int getIndex();
-    /**
-     * Sets the partition of this task.
-     *
-     * @param partition the partition of this task
-     */
-    void setPartition(int partition);
-    /**
-     * Returns the partition of this task.
-     *
-     * @return the partition of this task
-     */
-    int getPartition();
-    /**
-     * Sets the launch time of this task.
-     *
-     * @param time the launch time of this task
-     */
-    void setLaunchTime(long time);
-    /**
-     * Returns the launch time of this task.
-     *
-     * @return the launch time of this task
-     */
-    long getLaunchTime();
-    /**
-     * Sets the finish time of this task.
-     *
-     * @param time the finish time of this task
-     */
-    void setFinishTime(long time);
-    /**
-     * Returns the finish time of this task.
-     *
-     * @return the finish time of this task
-     */
-    long getFinishTime();
-    /**
-     * Sets the getting time of this task.
-     *
-     * @param time the getting time of this task
-     */
-    void setGettingTime(long time);
-    /**
-     * Returns the getting time of this task.
-     *
-     * @return the getting time of this task
-     */
-    long getGettingTime();
-    /**
-     * Sets the duration time of this task.
-     *
-     * @param time the duration time of this task
-     */
-    void setDurationTime(long time);
-    /**
-     * Returns the duration time of this task.
-     *
-     * @return the duration time of this task
-     */
-    long getDurationTime();
-    /**
-     * Sets the status of this task.
-     *
-     * @param status the status of this task
-     */
-    void setTaskStatus(boolean status);
-    /**
-     * Returns the status of this task.
-     *
-     * @return the status of this task
-     */
-    boolean getTaskSatus();
-
-    /**
-
-     Sets the task status for running.
-     @param taskStatusForRunning the {@link TaskStatusForRunning} status to be 
set for the task
-     */
-    void setTaskStatusForRunning(TaskStatusForRunning taskStatusForRunning);
-
-    /**
-
-     Returns the current task status for running.
-     @return the {@link TaskStatusForRunning} status of the task
-     */
-    TaskStatusForRunning getTaskStatusForRunning();
-
-    /**
-
-     Returns the {@link TaskMetric} associated with this task.
-     @return the {@link TaskMetric} of the task
-     */
-    org.apache.wayang.spark.monitoring.metrics.TaskMetric getTaskMetric();
-
-    /**
-
-     Sets the {@link TaskMetric} associated with this task.
-     @param taskMetric the {@link TaskMetric} to be set for the task
-     */
-    void setTaskMetric(org.apache.wayang.spark.monitoring.metrics.TaskMetric 
taskMetric);
-    // void setTaskRunningStatus(boolean status)
-
-
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/TaskMetric.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/TaskMetric.java
deleted file mode 100644
index 84f6b93e..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/interfaces/TaskMetric.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.interfaces;
-
-import java.io.Serializable;
-/**
- * The TaskMetric interface defines the methods used to set and retrieve
- * performance metrics for a given task.
- */
-public interface TaskMetric extends Serializable {
-
-    /**
-     * Sets the number of bytes read by the task.
-     *
-     * @param bytesRead the number of bytes read
-     */
-    void setBytesRead(long bytesRead);
-
-    /**
-     * Gets the number of bytes read by the task.
-     *
-     * @return the number of bytes read
-     */
-    long getByteRead();
-
-    /**
-     * Sets the CPU time used for deserializing the task executor.
-     *
-     * @param executorDeserializeCpuTime the CPU time used for deserializing 
the executor
-     */
-    void setExecutorDeserializeCpuTime(long executorDeserializeCpuTime );
-
-    /**
-     * Gets the CPU time used for deserializing the task executor.
-     *
-     * @return the CPU time used for deserializing the executor
-     */
-    long getExecutorDeserializeCpuTime();
-
-    /**
-     * Sets the time taken to deserialize the task executor.
-     *
-     * @param executorDeserializeTime the time taken to deserialize the 
executor
-     */
-    void setExecutorDeserializeTime(long executorDeserializeTime);
-
-    /**
-     * Gets the time taken to deserialize the task executor.
-     *
-     * @return the time taken to deserialize the executor
-     */
-    long getExecutorDeserializeTime();
-
-    /**
-     * Sets the number of bytes spilled to disk by the task.
-     *
-     * @param DiskByteSpilled the number of bytes spilled to disk
-     */
-    void setDiskBytesSpilled(long DiskByteSpilled);
-
-    /**
-     * Gets the number of bytes spilled to disk by the task.
-     *
-     * @return the number of bytes spilled to disk
-     */
-    long getDiskBytesSpilled();
-
-    /**
-     * Sets the total time taken by the task executor to run.
-     *
-     * @param time the time taken by the executor to run
-     */
-    void setExecutorRunTime(long time);
-
-    /**
-     * Gets the total time taken by the task executor to run.
-     *
-     * @return the time taken by the executor to run
-     */
-    long getexecutorRunTime();
-
-    /**
-     * Sets the amount of time spent by the JVM on garbage collection.
-     *
-     * @param time the amount of time spent on garbage collection
-     */
-    void setjvmGCTime(long time);
-
-    /**
-     * Gets the amount of time spent by the JVM on garbage collection.
-     *
-     * @return the amount of time spent on garbage collection
-     */
-    long getJVMGCTime();
-
-    /**
-     * Sets the peak execution memory used by the task executor.
-     *
-     * @param peakExecutionMemory the peak execution memory used by the 
executor
-     */
-    void setPeakExecutionMemory(long peakExecutionMemory);
-
-    /**
-     * Gets the peak execution memory used by the task executor.
-     *
-     * @return the peak execution memory used by the executor
-     */
-    long getPeakExecutionMemory();
-
-    /**
-     * Sets the size of the result produced by the task.
-     *
-     * @param resultSize the size of the result produced
-     */
-    void setResultSize(long resultSize);
-
-    /**
-     * Gets the size of the result produced by the task.
-     *
-     * @return the size of the result produced
-     */
-    long getResultSize();
-    /**
-     * Sets the time taken to serialize the result of the task.
-     *
-     * @param resultSerializationTime the time taken to serialize the result
-     */
-    void  setResultSerializationTime(long resultSerializationTime);
-    /**
-     * Returns the time taken to serialize the result of the task.
-     *
-     * @return the time taken to serialize the result
-     */
-    long getResultSerializationTime();
-    /**
-     * Sets the number of records written by the task.
-     *
-     * @param recordsWritten the number of records written
-     */
-    void setRecordsWritten(long recordsWritten);
-    /**
-     * Returns the number of records written by the task.
-     *
-     * @return the number of records written
-     */
-    long getRecordsWrittern();
-    /**
-     * Sets the number of bytes written by the task.
-     *
-     * @param bytesWritten the number of bytes written
-     */
-    void setBytesWritten(long bytesWritten);
-    /**
-     * Returns the number of bytes written by the task.
-     *
-     * @return the number of bytes written
-     */
-    long getBytesWrittern ();
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ApplicationEnd.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ApplicationEnd.java
deleted file mode 100644
index 6f463c76..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ApplicationEnd.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.wayang.spark.monitoring.metrics;
-import org.apache.wayang.spark.monitoring.interfaces.Application;
-import org.apache.wayang.spark.monitoring.interfaces.Job;
-import org.apache.wayang.spark.monitoring.interfaces.SerializableObject;
-
-import java.util.List;
-/**
- * The {@code ApplicationEnd} class represents an application that has ended. 
It implements
- * the {@link Application} interface and the {@link SerializableObject} 
interface, indicating
- * that it is an application that can be serialized and deserialized to and 
from a stream of bytes.
- * <p>
- * This class contains fields to store the name, time, ID, Spark user, event 
name, and list of jobs
- * associated with the application. It also provides methods to set and get 
the values of these fields.
- * </p>
- *
- * @see Application
- * @see SerializableObject
- */
-public class ApplicationEnd implements Application, SerializableObject {
-    private String name;
-    private long time;
-    private String id;
-    private String sparkUser;
-    private String eventName;
-    private List<Job> listOfJobs;
-    @Override
-    public void setEventame(String name) {
-        this.eventName=name;
-    }
-
-    @Override
-    public String getEventName() {
-        return name;
-    }
-
-    @Override
-    public void setName(String name) {
-        this.name=name;
-    }
-
-    @Override
-    public String getName() {
-        return name;
-    }
-
-    @Override
-    public void setStartTime(long time) {
-        this.time=time;
-    }
-
-    @Override
-    public long getTime() {
-        return time;
-    }
-
-    @Override
-    public void setAppID(String id) {
-        this.id=id;
-    }
-
-    @Override
-    public String getAppID() {
-        return id;
-    }
-
-    @Override
-    public void setSparkUser(String user) {
-        this.sparkUser=user;
-    }
-
-    @Override
-    public String getSparkUser() {
-        return sparkUser;
-    }
-
-    @Override
-    public void setListOfJobs(List<Job> listOfJobs) {
-        this.listOfJobs=listOfJobs;
-    }
-
-    @Override
-    public List<Job> getListOfjobs() {
-        return listOfJobs;
-    }
-}
-
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ApplicationStart.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ApplicationStart.java
deleted file mode 100644
index 6caee437..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ApplicationStart.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.metrics;
-import org.apache.wayang.spark.monitoring.interfaces.Application;
-import org.apache.wayang.spark.monitoring.interfaces.Job;
-import org.apache.wayang.spark.monitoring.interfaces.SerializableObject;
-
-import java.util.List;
-/**
- * The ApplicationStart class implements the Application and 
SerializableObject interfaces. It represents an application start event in a 
Spark cluster.
- * This class contains information about the name of the application, the time 
it started, the application ID, the Spark user who started the application,
- * the name of the event, and a list of jobs associated with the application.
- */
-public class ApplicationStart implements Application, SerializableObject {
-    private String name;
-    private long time;
-    private String id;
-    private String sparkUser;
-    private String eventName;
-    private List<Job> listOfJobs;
-    @Override
-    public void setEventame(String name) {
-        this.eventName=name;
-    }
-
-    @Override
-    public String getEventName() {
-        return eventName;
-    }
-
-    @Override
-    public void setName(String name) {
-        this.name=name;
-    }
-
-    @Override
-    public String getName() {
-        return name;
-    }
-
-    @Override
-    public void setStartTime(long time) {
-        this.time=time;
-    }
-
-    @Override
-    public long getTime() {
-        return time;
-    }
-
-    @Override
-    public void setAppID(String id) {
-        this.id=id;
-    }
-
-    @Override
-    public String getAppID() {
-        return id;
-    }
-
-    @Override
-    public void setSparkUser(String user) {
-        this.sparkUser=user;
-    }
-
-    @Override
-    public String getSparkUser() {
-        return sparkUser;
-    }
-
-    @Override
-    public void setListOfJobs(List<Job> listOfJobs) {
-        this.listOfJobs= listOfJobs;
-    }
-
-    @Override
-    public List<Job> getListOfjobs() {
-        return listOfJobs;
-    }
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ExecutorAdded.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ExecutorAdded.java
deleted file mode 100644
index 7a644e2f..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ExecutorAdded.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.metrics;
-import org.apache.wayang.spark.monitoring.interfaces.Executor;
-import org.apache.wayang.spark.monitoring.interfaces.SerializableObject;
-
-/**
- * The ExecutorAdded class represents an executor added event in a distributed 
computing system.
- * It implements the Executor interface and the SerializableObject interface.
- *
- * This class contains information about the executor that was added, such as 
its stage ID, executor ID,
- * stage attempt, time, executor host, total cores, and reason of removal.
- *
- * This class provides getters and setters for all of the above properties, 
and implements the methods
- * defined in the Executor interface.
- */
-
-public class ExecutorAdded implements Executor, SerializableObject {
-    private int stageId;
-    private String executorID;
-    private int stageAttempt;
-    private long time;
-    private String executorHost;
-    private int totalCores;
-    private String reasonOfRemoval;
-
-    private String eventName;
-
-    @Override
-    public void setEventame(String name) {
-        this.eventName=name;
-    }
-
-    @Override
-    public String getEventName() {
-        return eventName;
-    }
-
-
-    @Override
-    public void setStageID(int id) {
-        this.stageId=id;
-    }
-
-    @Override
-    public int getStageID() {
-        return stageId;
-    }
-
-    @Override
-    public void setExecutorID(String id) {
-        this.executorID=id;
-    }
-
-    @Override
-    public String getExecutorID() {
-        return executorID;
-    }
-
-    @Override
-    public void stageAttempt(int id) {
-        this.stageAttempt=id;
-    }
-
-    @Override
-    public int getStageAttempt() {
-        return stageAttempt;
-    }
-
-    @Override
-    public void executorTime(long Time) {
-        this.time=time;
-    }
-
-    @Override
-    public long getExecutorTime() {
-        return time;
-    }
-
-    @Override
-    public void setExecutorHost(String host) {
-        this.executorHost=host;
-    }
-
-    @Override
-    public String getExecutorHost() {
-        return executorHost;
-    }
-
-    @Override
-    public void setTotalCores(int cores) {
-        this.totalCores=cores;
-    }
-
-    @Override
-    public int getTotalCores() {
-        return totalCores;
-    }
-
-    @Override
-    public void setResourceInfo(int resourceInfoId) {
-
-    }
-    public String getReasonOfRemoval() {
-        return reasonOfRemoval;
-    }
-
-    public void setReasonOfRemoval(String reasonOfRemoval) {
-        this.reasonOfRemoval = reasonOfRemoval;
-    }
-    @Override
-    public int getResourceInfo() {
-        return 0;
-    }
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ExecutorRemoved.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ExecutorRemoved.java
deleted file mode 100644
index 1f22236f..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ExecutorRemoved.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.wayang.spark.monitoring.metrics;
-import org.apache.wayang.spark.monitoring.interfaces.Executor;
-import org.apache.wayang.spark.monitoring.interfaces.SerializableObject;
-
-/**
- * An event class representing the removal of an executor.
- * Implements the Executor interface and SerializableObject interface.
- */
-public class ExecutorRemoved implements Executor, SerializableObject {
-    private int stageId;
-    private String executorID;
-    private int stageAttempt;
-    private long time;
-    private String executorHost;
-    private int totalCores;
-
-    public String getReasonOfRemoval() {
-        return reasonOfRemoval;
-    }
-
-    public void setReasonOfRemoval(String reasonOfRemoval) {
-        this.reasonOfRemoval = reasonOfRemoval;
-    }
-
-    private String reasonOfRemoval;
-
-    private String eventName;
-
-    @Override
-    public void setEventame(String name) {
-        this.eventName=name;
-    }
-
-    @Override
-    public String getEventName() {
-        return eventName;
-    }
-
-
-    @Override
-    public void setStageID(int id) {
-        this.stageId=id;
-    }
-
-    @Override
-    public int getStageID() {
-        return stageId;
-    }
-
-    @Override
-    public void setExecutorID(String id) {
-        this.executorID=id;
-    }
-
-    @Override
-    public String getExecutorID() {
-        return executorID;
-    }
-
-    @Override
-    public void stageAttempt(int id) {
-        this.stageAttempt=id;
-    }
-
-    @Override
-    public int getStageAttempt() {
-        return stageAttempt;
-    }
-
-    @Override
-    public void executorTime(long Time) {
-        this.time=time;
-    }
-
-    @Override
-    public long getExecutorTime() {
-        return time;
-    }
-
-    @Override
-    public void setExecutorHost(String host) {
-        this.executorHost=host;
-    }
-
-    @Override
-    public String getExecutorHost() {
-        return executorHost;
-    }
-
-    @Override
-    public void setTotalCores(int cores) {
-        this.totalCores=cores;
-    }
-
-    @Override
-    public int getTotalCores() {
-        return totalCores;
-    }
-
-    @Override
-    public void setResourceInfo(int resourceInfoId) {
-
-    }
-
-    @Override
-    public int getResourceInfo() {
-        return 0;
-    }
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ExecutorUpdated.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ExecutorUpdated.java
deleted file mode 100644
index 6883db0a..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/ExecutorUpdated.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.metrics;
-import org.apache.wayang.spark.monitoring.interfaces.Executor;
-import org.apache.wayang.spark.monitoring.interfaces.SerializableObject;
-
-/**
-
- An implementation of the Executor interface that represents an updated 
executor.
-
- This class contains information about an executor that has been updated, 
including its
-
- stage ID, executor ID, stage attempt, execution time, host, total cores, 
reason of removal,
-
- and event name.
-
- @author [Adeel Aslam]
-
- @version 1.0
-
- @since [3-24-2023]
- */
-public class ExecutorUpdated implements Executor, SerializableObject {
-    private int stageId;
-    private String executorID;
-    private int stageAttempt;
-    private long time;
-    private String executorHost;
-    private int totalCores;
-    public String getReasonOfRemoval() {
-        return reasonOfRemoval;
-    }
-
-    public void setReasonOfRemoval(String reasonOfRemoval) {
-        this.reasonOfRemoval = reasonOfRemoval;
-    }
-
-    private String reasonOfRemoval;
-
-    private String eventName;
-
-    @Override
-    public void setEventame(String name) {
-        this.eventName=name;
-    }
-
-    @Override
-    public String getEventName() {
-        return eventName;
-    }
-
-
-    @Override
-    public void setStageID(int id) {
-        this.stageId=id;
-    }
-
-    @Override
-    public int getStageID() {
-        return stageId;
-    }
-
-    @Override
-    public void setExecutorID(String id) {
-        this.executorID=id;
-    }
-
-    @Override
-    public String getExecutorID() {
-        return executorID;
-    }
-
-    @Override
-    public void stageAttempt(int id) {
-        this.stageAttempt=id;
-    }
-
-    @Override
-    public int getStageAttempt() {
-        return stageAttempt;
-    }
-
-    @Override
-    public void executorTime(long Time) {
-        this.time=time;
-    }
-
-    @Override
-    public long getExecutorTime() {
-        return time;
-    }
-
-    @Override
-    public void setExecutorHost(String host) {
-        this.executorHost=host;
-    }
-
-    @Override
-    public String getExecutorHost() {
-        return executorHost;
-    }
-
-    @Override
-    public void setTotalCores(int cores) {
-        this.totalCores=cores;
-    }
-
-    @Override
-    public int getTotalCores() {
-        return totalCores;
-    }
-
-    @Override
-    public void setResourceInfo(int resourceInfoId) {
-
-    }
-
-    @Override
-    public int getResourceInfo() {
-        return 0;
-    }
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/JobEnd.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/JobEnd.java
deleted file mode 100644
index 43229f35..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/JobEnd.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.metrics;
-import org.apache.wayang.spark.monitoring.interfaces.Job;
-import org.apache.wayang.spark.monitoring.interfaces.SerializableObject;
-import org.apache.wayang.spark.monitoring.interfaces.Stage;
-import scala.collection.Seq;
-
-import java.util.List;
-/**
-
- * JobEnd class represents a job's end in a system.
-
- * It implements the Job and SerializableObject interfaces.
- */
-public class JobEnd implements Job, SerializableObject {
-    private int id;
-    private int productArity;
-    private Seq<Object> seqStageId;
-
-    private String eventName;
-    private List<Stage> listOfStages;
-    @Override
-    public void setEventame(String name) {
-        this.eventName=name;
-    }
-
-    @Override
-    public String getEventName() {
-        return eventName;
-    }
-
-
-    @Override
-    public void setJobID(int jobID) {
-        this.id=jobID;
-    }
-
-    @Override
-    public int getJobID() {
-        return id;
-    }
-
-    @Override
-    public void setProductArity(int productArity) {
-        this.productArity=productArity;
-    }
-
-    @Override
-    public int getProductArity() {
-        return productArity;
-    }
-
-    @Override
-    public void setStageID(Seq<Object> stageId) {
-        this.seqStageId=stageId;
-    }
-
-    @Override
-    public Seq<Object> getStageID() {
-        return seqStageId;
-    }
-
-    @Override
-    public void setListOfStages(List<Stage> listOfStages) {
-        this.listOfStages=listOfStages;
-    }
-
-    @Override
-    public List<Stage> getListOfStages() {
-        return listOfStages;
-    }
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/JobStart.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/JobStart.java
deleted file mode 100644
index b83db3bf..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/JobStart.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.metrics;
-
-import org.apache.wayang.spark.monitoring.interfaces.Job;
-import org.apache.wayang.spark.monitoring.interfaces.SerializableObject;
-import org.apache.wayang.spark.monitoring.interfaces.Stage;
-import scala.collection.Seq;
-
-import java.util.List;
-/**
- * JobStart class represents a job's start in a system.
- * It implements the Job and SerializableObject interfaces.
- */
-public class JobStart implements Job, SerializableObject {
-    private int id;
-    private int productArity;
-    private Seq<Object> seqStageId;
-    private String eventName;
-    private List<Stage> listOfStages;
-
-    @Override
-    public void setEventame(String name) {
-        this.eventName=name;
-    }
-
-    @Override
-    public String getEventName() {
-        return eventName;
-    }
-
-
-    @Override
-    public void setJobID(int jobID) {
-        this.id=jobID;
-    }
-
-    @Override
-    public int getJobID() {
-        return id;
-    }
-
-    @Override
-    public void setProductArity(int productArity) {
-        this.productArity=productArity;
-    }
-
-    @Override
-    public int getProductArity() {
-        return productArity;
-    }
-
-    @Override
-    public void setStageID(Seq<Object> stageId) {
-        this.seqStageId=stageId;
-    }
-
-    @Override
-    public Seq<Object> getStageID() {
-        return seqStageId;
-    }
-
-    @Override
-    public void setListOfStages(List<Stage> listOfStages) {
-        this.listOfStages=listOfStages;
-    }
-
-    @Override
-    public List<Stage> getListOfStages() {
-        return listOfStages;
-    }
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/SparkListener.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/SparkListener.java
deleted file mode 100644
index 6cb182c8..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/SparkListener.java
+++ /dev/null
@@ -1,524 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.metrics;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.spark.executor.TaskMetrics;
-import org.apache.spark.scheduler.*;
-import org.apache.spark.scheduler.cluster.ExecutorInfo;
-import org.apache.wayang.spark.monitoring.interfaces.Stage;
-import org.apache.wayang.spark.monitoring.interfaces.Task;
-import org.apache.wayang.spark.monitoring.interfaces.*;
-import scala.collection.Seq;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-/**
- * A Spark listener implementation that captures events generated during the 
Spark job execution
- * and sends them to a Kafka topic for further processing.
- */
-public class SparkListener extends org.apache.spark.scheduler.SparkListener {
-    // Member variables to store various data objects
-    private List<Job> listOfJobs;
-    private List<Stage> listOfStages;
-    private List<Task> listOfTasks;
-    private List<SerializableObject> applicationObjects;
-    private List<SerializableObject> jobObjects;
-    private List<SerializableObject> stageObjects;
-    private List<SerializableObject> taskObjects;
-    // Kafka producer to send data to Kafka topic
-    private KafkaProducer<String, byte[]> producer;
-    private final static String KAFKA_PROPERTIES = 
"/wayang-spark-kafka.properties";
-    // Kafka topic name to which the data will be sent
-    private String kafkaTopic;
-    // Logger instance to log messages
-    protected final Logger logger = LogManager.getLogger(this.getClass());
-    /**
-     * Default constructor that initializes the Kafka producer and various 
data lists.
-     */
-    public SparkListener(){
-        Properties props = new Properties();
-        try (InputStream inputStream = 
getClass().getResourceAsStream(KAFKA_PROPERTIES)) {
-            props.load(inputStream);
-        }
-        catch (Exception e){
-            logger.error("This is an error message with an exception.", e);
-        }
-        producer = new KafkaProducer<>(props);
-        this.kafkaTopic = props.getProperty("kafka.topic");
-        this.listOfJobs= new ArrayList<>();
-        this.listOfStages= new ArrayList<>();
-        this.listOfTasks= new ArrayList<>();
-        this.applicationObjects= new ArrayList<>();
-        this.jobObjects= new ArrayList<>();
-        this.stageObjects= new ArrayList<>();
-        this.taskObjects= new ArrayList<>();
-    }
-
-    @Override
-    public void onExecutorBlacklisted(SparkListenerExecutorBlacklisted 
executorBlacklisted) {
-        super.onExecutorBlacklisted(executorBlacklisted);
-        executorBlacklisted.executorId();
-        executorBlacklisted.time();
-    }
-    /**
-     * Overridden method that captures the event generated when an executor is 
added in Spark
-     * and sends it to the Kafka topic for further processing.
-     *
-     * @param executorAddedSpark The event that occurred when the executor was 
added
-     */
-    @Override
-    public void onExecutorAdded(SparkListenerExecutorAdded executorAddedSpark) 
{
-        super.onExecutorAdded(executorAddedSpark);
-        Executor executorAdded= new ExecutorAdded();
-         executorAdded.setEventame("ExecutorAdded");
-         executorAdded.setExecutorID(executorAddedSpark.executorId());
-        ExecutorInfo executorInfo=executorAddedSpark.executorInfo();
-         executorAdded.setExecutorHost(executorInfo.executorHost());
-         executorAdded.setTotalCores(executorInfo.totalCores());
-        // executorAdded.setResourceInfo(executorInfo.resourceProfileId());
-         executorAdded.setExecutorHost(executorInfo.executorHost());
-         executorAdded.executorTime(executorAddedSpark.time());
-        try {
-            ByteArrayOutputStream boas = new ByteArrayOutputStream();
-            ObjectOutput out = new ObjectOutputStream(boas);
-            out.writeObject(executorAdded);
-            producer.send(new ProducerRecord(kafkaTopic, "ExecutorAdded", 
boas.toByteArray()));
-        }
-        catch (Exception e){
-            e.printStackTrace();
-        }
-
-    }
-
-    /**
-
-     This method is called when an executor is removed from a Spark 
application, and it sends information about
-     the removal event to a Kafka topic.
-     @param executorRemovedSpark the SparkListenerExecutorRemoved event 
containing information about the removed executor
-     */
-    @Override
-    public void onExecutorRemoved(SparkListenerExecutorRemoved 
executorRemovedSpark) {
-        super.onExecutorRemoved(executorRemovedSpark);
-        Executor executorRemoved= new ExecutorRemoved();
-         executorRemoved.setEventame("ExecutorRemoved");
-         executorRemoved.setExecutorHost(executorRemovedSpark.executorId());
-         executorRemoved.setReasonOfRemoval(executorRemovedSpark.reason());
-         executorRemoved.executorTime(executorRemovedSpark.time());
-        try {
-            ByteArrayOutputStream boas = new ByteArrayOutputStream();
-            ObjectOutput out = new ObjectOutputStream(boas);
-            out.writeObject(executorRemoved);
-            producer.send(new ProducerRecord(kafkaTopic, "ExecutorRemoved", 
boas.toByteArray()));
-        }
-        catch (Exception e){
-            this.logger.error("Exception {} for executor added",e);
-        }
-    }
-    /**
-
-     This method is called when metrics are updated for an executor in a Spark 
application, and it sends information about
-     the updated executor to a Kafka topic.
-     @param executorMetricsUpdateSpark the SparkListenerExecutorMetricsUpdate 
event containing information about the updated executor's metrics
-     */
-    @Override
-    public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate 
executorMetricsUpdateSpark) {
-        super.onExecutorMetricsUpdate(executorMetricsUpdateSpark);
-        Executor executorUpdated= new ExecutorUpdated();
-         executorUpdated.setExecutorID(executorMetricsUpdateSpark.execId());
-         executorUpdated.setEventame("ExecutorUpdated");
-        try {
-            ByteArrayOutputStream boas = new ByteArrayOutputStream();
-            ObjectOutput out = new ObjectOutputStream(boas);
-            out.writeObject(executorUpdated);
-            producer.send(new ProducerRecord(kafkaTopic, "ExecutorUpdated", 
boas.toByteArray()));
-        }
-        catch (Exception e){
-            e.printStackTrace();
-        }
-    }
-    /**
-
-     This method is called when a task starts in a Spark application, and it 
creates a new TaskStart object with information
-     about the task and adds it to a list of task objects for serializable.
-     @param taskStartSpark the SparkListenerTaskStart event containing 
information about the started task
-     */
-    @Override
-    public void onTaskStart(SparkListenerTaskStart taskStartSpark) {
-        super.onTaskStart(taskStartSpark);
-        Task taskStart=new TaskStart();
-        TaskInfo taskInfo= taskStartSpark.taskInfo();
-        taskStart.setID(taskInfo.id());
-        taskStart.setEventame("OnTaskStart");
-        taskStart.setHostIP(taskInfo.host());
-        taskStart.setStringExecutorID(taskInfo.executorId());
-        taskStart.setTaskStatus(taskInfo.status());
-        taskStart.setTaskID(taskInfo.taskId());
-        taskStart.setIndex(taskInfo.index());
-        taskStart.setLaunchTime(taskInfo.launchTime());
-        taskStart.setFinishTime(taskInfo.finishTime());
-     //this.taskStart.setDurationTime(taskInfo.duration());
-        taskStart.setGettingTime(taskInfo.gettingResultTime());
-        taskStart.setStageID(taskStartSpark.stageId());
-        //this.taskStart.setPartition(taskInfo.);
-        if(taskInfo.failed()){
-          taskStart.setTaskStatusForRunning(Task.TaskStatusForRunning.FAILED);
-        }
-        else if(taskInfo.finished()){
-           
taskStart.setTaskStatusForRunning(Task.TaskStatusForRunning.FINISHED);
-        }
-        else if(taskInfo.killed()){
-           taskStart.setTaskStatusForRunning(Task.TaskStatusForRunning.KILLED);
-        }
-        else  if(taskInfo.running()){
-           
taskStart.setTaskStatusForRunning(Task.TaskStatusForRunning.RUNNING);
-        }
-        else if(taskInfo.successful()){
-            
taskStart.setTaskStatusForRunning(Task.TaskStatusForRunning.SUCCESSFUL);
-        }
-        else if(taskInfo.speculative()){
-           
taskStart.setTaskStatusForRunning(Task.TaskStatusForRunning.SPECULATIVE);
-        }
-        else {
-           taskStart.setTaskStatusForRunning(null);
-        }
-
-        this.taskObjects.add(taskStart);
-
-    }
-
-    /**
-     * This method is called when a Spark application starts. It extends the 
behavior of the
-     * superclass by passing along the given SparkListenerApplicationStart 
event to its parent
-     * implementation.
-     *
-     * @param applicationStartSpark the SparkListenerApplicationStart event 
that was triggered
-     */
-    @Override
-    public void onApplicationStart(SparkListenerApplicationStart 
applicationStartSpark) {
-        super.onApplicationStart(applicationStartSpark);
-        Application applicationStart= new ApplicationStart();
-         applicationStart.setAppID(applicationStartSpark.appId().get());
-         applicationStart.setEventame("ApplicationStart");
-         applicationStart.setName(applicationStartSpark.appName());
-         applicationStart.setSparkUser(applicationStartSpark.sparkUser());
-         applicationStart.setStartTime(applicationStartSpark.time());
-        this.applicationObjects.add((SerializableObject) applicationStart);
-        // this.jobObjects.add((SerializableObject) this.applicationStart);
-        // this.stageObjects.add((SerializableObject) this.applicationStart);
-        // this.taskObjects.add((SerializableObject) this.applicationStart);
-        // this.objects.add((SerializableObject) this.applicationStart);
-    }
-    /**
-
-     This method is called when a new Spark job starts. It creates a new 
JobStart object to represent the event,
-     sets its properties using the data provided in the SparkListenerJobStart 
object, and adds the object to the
-     list of jobs and the list of job objects.
-     @param jobStartSpark the SparkListenerJobStart object containing data 
about the new job
-     */
-    @Override
-    public void onJobStart(SparkListenerJobStart jobStartSpark) {
-        super.onJobStart(jobStartSpark);
-        Job jobStart= new JobStart();
-         jobStart.setEventame("JobStart");
-         jobStart.setJobID(jobStartSpark.jobId());
-         jobStart.setProductArity(jobStartSpark.productArity());
-         jobStart.setStageID((Seq<Object>) jobStartSpark.stageIds());
-        this.listOfJobs.add(jobStart);
-        this.jobObjects.add((SerializableObject) jobStart);
-    }
-/**
-
- This method is called when a job ends in the Spark application. It creates a 
new instance of the JobEnd class,
- sets the necessary attributes and adds it to the list of jobs and job 
objects. Then it serializes the job objects
- and sends them to Kafka. It also resets the job objects and list of stages 
for the next job.
- @param jobEndSpark a SparkListenerJobEnd object representing the end of a job
- */
-    @Override
-    public void onJobEnd(SparkListenerJobEnd jobEndSpark) {
-        super.onJobEnd(jobEndSpark);
-        Job jobEnd= new JobEnd();
-          jobEnd.setJobID(jobEndSpark.jobId());
-          jobEnd.setEventame("JobEnd");
-          jobEnd.setProductArity(jobEndSpark.productArity());
-          jobEnd.setListOfStages(this.listOfStages);
-        this.listOfJobs.add(jobEnd);
-        this.jobObjects.add((SerializableObject) jobEnd);
-        try {
-
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            ObjectOutputStream oos = new ObjectOutputStream(baos);
-            oos.writeObject(this.jobObjects);
-            producer.send(new ProducerRecord(kafkaTopic, "JobObjects", 
baos.toByteArray()));
-            this.jobObjects= new ArrayList<>();
-            this.listOfStages= new ArrayList<>();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-    /**
-
-     Called when a SparkListenerTaskEnd event is triggered.
-     @param taskEndSpark The SparkListenerTaskEnd object representing the 
event that was triggered.
-     This method overrides the onTaskEnd method from the superclass and 
performs additional actions:
-     creates a new TaskEnd object and sets its properties based on the 
information in the taskEndSpark parameter
-     adds the TaskEnd object to the listOfTasks and taskObjects arrays
-     serializes the taskObjects array and sends it to a Kafka producer
-     The TaskEnd object represents the end of a task, and includes information 
such as the task's ID, event name, host IP,
-     executor ID, status, task ID, index, launch time, finish time, duration 
time, getting time, and task status for running.
-     The TaskMetric object represents the metrics for the task, and includes 
information such as the executor CPU time,
-     executor deserialize CPU time, executor deserialize time, disk bytes 
spilled, executor run time, JVM GC time, peak execution memory,
-     result size, and result serialization time.
-     This method catches and prints any exceptions that may occur during the 
serialization and sending of the taskObjects array.
-     */
-
-    @Override
-    public void onTaskEnd(SparkListenerTaskEnd taskEndSpark) {
-        super.onTaskEnd(taskEndSpark);
-        Task taskEnd=new TaskEnd();
-        TaskInfo taskInfo= taskEndSpark.taskInfo();
-        taskEnd.setID(taskInfo.id());
-        taskEnd.setEventame("OnTaskGettingResult");
-        taskEnd.setHostIP(taskInfo.host());
-        taskEnd.setStringExecutorID(taskInfo.executorId());
-        taskEnd.setTaskStatus(taskInfo.status());
-        taskEnd.setTaskID(taskInfo.taskId());
-        taskEnd.setIndex(taskInfo.index());
-        taskEnd.setLaunchTime(taskInfo.launchTime());
-        taskEnd.setFinishTime(taskInfo.finishTime());
-        taskEnd.setDurationTime(taskInfo.duration());
-        taskEnd.setGettingTime(taskInfo.gettingResultTime());
-        //  this. taskGettingResult.setStageID(taskGettingResult.stageId());
-        //this.taskEnd.setPartition(taskInfo.partitionId());
-        if(taskInfo.failed()){
-          taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.FAILED);
-        }
-        else if(taskInfo.finished()){
-           taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.FINISHED);
-        }
-        else if(taskInfo.killed()){
-          taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.KILLED);
-        }
-        else  if(taskInfo.running()){
-           taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.RUNNING);
-        }
-        else if(taskInfo.successful()){
-           
taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.SUCCESSFUL);
-        }
-        else if(taskInfo.speculative()){
-            
taskEnd.setTaskStatusForRunning(Task.TaskStatusForRunning.SPECULATIVE);
-        }
-        else {
-           taskEnd.setTaskStatusForRunning(null);
-        }
-
-
-        TaskMetrics taskMetrics= taskEndSpark.taskMetrics();
-        TaskMetric taskMetric= new TaskMetric();
-        taskMetric.setExecutorCPUTime(taskMetrics.executorCpuTime());
-        
taskMetric.setExecutorDeserializeCpuTime(taskMetrics.executorDeserializeCpuTime());
-        
taskMetric.setExecutorDeserializeTime(taskMetrics.executorDeserializeTime());
-        taskMetric.setDiskBytesSpilled(taskMetrics.diskBytesSpilled());
-        taskMetric.setExecutorRunTime(taskMetrics.executorRunTime());
-        taskMetric.setjvmGCTime(taskMetrics.jvmGCTime());
-        taskMetric.setPeakExecutionMemory(taskMetrics.peakExecutionMemory());
-        taskMetric.setResultSize(taskMetrics.resultSize());
-        
taskMetric.setResultSerializationTime(taskMetrics.resultSerializationTime());
-        taskEnd.setTaskMetric(taskMetric);
-        this.listOfTasks.add(taskEnd);
-        this.taskObjects.add(taskEnd);
-        try {
-
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            ObjectOutputStream oos = new ObjectOutputStream(baos);
-            oos.writeObject(this.taskObjects);
-            producer.send(new ProducerRecord(kafkaTopic, "TaskEnd", 
baos.toByteArray()));
-            this.taskObjects= new ArrayList<>();
-
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-
-    }
-    /**
-
-     Overrides the onStageCompleted method from SparkListener to customize 
handling of
-     stage completion events. Creates a StageCompleted object and sets its 
properties based on the
-     StageInfo and TaskMetrics of the completed stage. Adds the StageCompleted 
object to a list of stages
-     and adds it to a list of SerializableObjects to be sent to a Kafka 
producer.
-     @param stageCompletedSpark the SparkListenerStageCompleted event to be 
handled
-     */
-    @Override
-    public void onStageCompleted(SparkListenerStageCompleted 
stageCompletedSpark) {
-        super.onStageCompleted(stageCompletedSpark);
-        Stage stageCompleted= new StageCompleted();
-        StageInfo stageInfo=stageCompletedSpark.stageInfo();
-        stageCompleted.setDetails(stageInfo.details());
-        stageCompleted.setEventame("OnStageSubmitted");
-        stageCompleted.setStageName(stageInfo.name());
-         stageCompleted.setStatus(stageInfo.getStatusString());
-         stageCompleted.setNumberOfTasks(stageInfo.numTasks());
-         stageCompleted.setID(stageInfo.stageId());
-         stageCompleted.setSubmissionTime((Long) 
stageInfo.submissionTime().get());
-        stageCompleted.setCompletionTime((Long) 
stageInfo.completionTime().get());
-        TaskMetrics taskMetrics= stageCompletedSpark.stageInfo().taskMetrics();
-        TaskMetric taskMetric= new TaskMetric();
-        taskMetric.setExecutorCPUTime(taskMetrics.executorCpuTime());
-        
taskMetric.setExecutorDeserializeCpuTime(taskMetrics.executorDeserializeCpuTime());
-        
taskMetric.setExecutorDeserializeTime(taskMetrics.executorDeserializeTime());
-        taskMetric.setDiskBytesSpilled(taskMetrics.diskBytesSpilled());
-        taskMetric.setExecutorRunTime(taskMetrics.executorRunTime());
-        taskMetric.setjvmGCTime(taskMetrics.jvmGCTime());
-        taskMetric.setPeakExecutionMemory(taskMetrics.peakExecutionMemory());
-        taskMetric.setResultSize(taskMetrics.resultSize());
-        
taskMetric.setResultSerializationTime(taskMetrics.resultSerializationTime());
-        stageCompleted.setTaskMetric(taskMetric);
-        this.listOfStages.add(stageCompleted);
-         stageCompleted.setListOfTasks(this.listOfTasks);
-        this.stageObjects.add((SerializableObject) stageCompleted);
-        try {
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            ObjectOutputStream oos = new ObjectOutputStream(baos);
-            oos.writeObject(this.stageObjects);
-            producer.send(new ProducerRecord(kafkaTopic, "Stage", 
baos.toByteArray()));
-            this.stageObjects= new ArrayList<>();
-            this.listOfTasks= new ArrayList<>();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-    /**
-     * This method is called whenever a new stage is submitted to the Spark 
engine. It adds the details of the
-     * submitted stage to a list of stages and a list of stage objects.
-     *
-     * @param stageSubmittedSpark the SparkListenerStageSubmitted object 
containing information about the submitted stage
-     */
-    @Override
-    public void onStageSubmitted(SparkListenerStageSubmitted 
stageSubmittedSpark) {
-        super.onStageSubmitted(stageSubmittedSpark);
-        Stage stageSubmitted= new StageSubmitted();
-        StageInfo stageInfo=stageSubmittedSpark.stageInfo();
-        stageSubmitted.setDetails(stageInfo.details());
-        stageSubmitted.setEventame("OnStageSubmitted");
-        stageSubmitted.setStageName(stageInfo.name());
-        stageSubmitted.setStatus(stageInfo.getStatusString());
-        stageSubmitted.setNumberOfTasks(stageInfo.numTasks());
-        stageSubmitted.setID(stageInfo.stageId());
-        stageSubmitted.setSubmissionTime((Long) 
stageInfo.submissionTime().get());
-        //this.stageSubmitted.setCompletionTime((Long) 
stageInfo.completionTime().get());
-        TaskMetrics taskMetrics= stageSubmittedSpark.stageInfo().taskMetrics();
-        TaskMetric taskMetric= new TaskMetric();
-        taskMetric.setExecutorCPUTime(taskMetrics.executorCpuTime());
-        
taskMetric.setExecutorDeserializeCpuTime(taskMetrics.executorDeserializeCpuTime());
-        
taskMetric.setExecutorDeserializeTime(taskMetrics.executorDeserializeTime());
-        taskMetric.setDiskBytesSpilled(taskMetrics.diskBytesSpilled());
-        taskMetric.setExecutorRunTime(taskMetrics.executorRunTime());
-        taskMetric.setjvmGCTime(taskMetrics.jvmGCTime());
-        taskMetric.setPeakExecutionMemory(taskMetrics.peakExecutionMemory());
-        taskMetric.setResultSize(taskMetrics.resultSize());
-        
taskMetric.setResultSerializationTime(taskMetrics.resultSerializationTime());
-        stageSubmitted.setTaskMetric(taskMetric);
-        this.listOfStages.add(stageSubmitted);
-        this.stageObjects.add((SerializableObject) stageSubmitted);
-
-    }
-
-    /**
-     * This method is called when the Spark application ends. It creates a new 
ApplicationEnd object containing
-     * the start time of the application and a list of jobs, and adds the 
object to a list of application objects.
-     * It then serializes the list of application objects and sends it to a 
Kafka topic. Finally, it clears the
-     * lists of application and job objects to prepare for the next 
application run.
-     *
-     * @param applicationEndSpark the SparkListenerApplicationEnd object 
containing information about the end of the application
-     */
-    @Override
-    public void onApplicationEnd(SparkListenerApplicationEnd 
applicationEndSpark) {
-        super.onApplicationEnd(applicationEndSpark);
-        Application applicationEnd=new ApplicationEnd();
-        applicationEnd.setStartTime(applicationEndSpark.time());
-        applicationEnd.setListOfJobs(this.listOfJobs);
-        this.applicationObjects.add((SerializableObject) applicationEnd);
-        try {
-
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            ObjectOutputStream oos = new ObjectOutputStream(baos);
-            oos.writeObject(this.applicationObjects);
-            producer.send(new ProducerRecord(kafkaTopic, "ApplicationObjects", 
baos.toByteArray()));
-            this.applicationObjects= new ArrayList<>();
-            this.listOfJobs= new ArrayList<>();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-
-
-    }
-    /**
-
-     Called when a task's result is being fetched. Adds a new Task object to 
the listOfTasks and taskObjects.
-     @param taskGettingResultSpark The SparkListenerTaskGettingResult object 
containing information about the task result.
-     */
-    @Override
-    public void onTaskGettingResult(SparkListenerTaskGettingResult 
taskGettingResultSpark) {
-        super.onTaskGettingResult(taskGettingResultSpark);
-        Task taskGettingResult=new TaskGettingResult();
-        TaskInfo taskInfo= taskGettingResultSpark.taskInfo();
-        taskGettingResult.setID(taskInfo.id());
-        taskGettingResult.setEventame("OnTaskGettingResult");
-        taskGettingResult.setHostIP(taskInfo.host());
-        taskGettingResult.setStringExecutorID(taskInfo.executorId());
-        taskGettingResult.setTaskStatus(taskInfo.status());
-        taskGettingResult.setTaskID(taskInfo.taskId());
-        taskGettingResult.setIndex(taskInfo.index());
-        taskGettingResult.setLaunchTime(taskInfo.launchTime());
-        taskGettingResult.setFinishTime(taskInfo.finishTime());
-        taskGettingResult.setDurationTime(taskInfo.duration());
-       taskGettingResult.setGettingTime(taskInfo.gettingResultTime());
-        //  this. taskGettingResult.setStageID(taskGettingResult.stageId());
-       // this.taskGettingResult.setPartition(taskInfo.partitionId());
-        if(taskInfo.failed()){
-           
taskGettingResult.setTaskStatusForRunning(Task.TaskStatusForRunning.FAILED);
-        }
-        else if(taskInfo.finished()){
-            
taskGettingResult.setTaskStatusForRunning(Task.TaskStatusForRunning.FINISHED);
-        }
-        else if(taskInfo.killed()){
-           
taskGettingResult.setTaskStatusForRunning(Task.TaskStatusForRunning.KILLED);
-        }
-        else  if(taskInfo.running()){
-           
taskGettingResult.setTaskStatusForRunning(Task.TaskStatusForRunning.RUNNING);
-        }
-        else if(taskInfo.successful()){
-           
taskGettingResult.setTaskStatusForRunning(Task.TaskStatusForRunning.SUCCESSFUL);
-        }
-        else if(taskInfo.speculative()){
-            
taskGettingResult.setTaskStatusForRunning(Task.TaskStatusForRunning.SPECULATIVE);
-        }
-        else {
-          taskGettingResult.setTaskStatusForRunning(null);
-        }
-        this.listOfTasks.add(taskGettingResult);
-        this.taskObjects.add( taskGettingResult);
-
-    }
-}
\ No newline at end of file
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/StageCompleted.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/StageCompleted.java
deleted file mode 100644
index 295e6db0..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/StageCompleted.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.metrics;
-
-import org.apache.wayang.spark.monitoring.interfaces.SerializableObject;
-import org.apache.wayang.spark.monitoring.interfaces.Stage;
-import org.apache.wayang.spark.monitoring.interfaces.Task;
-
-import java.util.List;
-/**
- * Represents a completed stage in a distributed computing system.
- *
- * This class implements the Stage interface and SerializableObject interface.
- *
- * The completed stage contains the following information:
- * - The ID of the stage
- * - The number of tasks in the stage
- * - The name of the stage
- * - The status of the stage
- * - The details of the stage
- * - The ID of the executor that executed the stage
- * - The stage attempt ID
- * - The task metric for the stage
- * - The list of tasks for the stage
- * - The event name for the stage
- * - The stage completion time
- *
- * This class provides methods to get and set the above information.
- */
-public class StageCompleted implements Stage, SerializableObject {
-    private int id;
-    private int tasks;
-    private String stageName;
-    private String status;
-    private String details;
-    private String executorID;
-    private int stateAttempt;
-    private TaskMetric taskMetric;
-    private List<Task> listOfTasks;
-    private String eventName;
-    private long stageCompletionTime;
-
-    @Override
-    public void setTaskMetric(TaskMetric taskMetric) {
-        this.taskMetric=taskMetric;
-    }
-
-
-    @Override
-    public void setExecutorID(String ID) {
-        this.executorID=ID;
-    }
-
-    @Override
-    public String getExecutorID() {
-        return executorID;
-    }
-
-    @Override
-    public void setStageAttemptId(int id) {
-        this.id=id;
-    }
-
-    @Override
-    public int getStageAttemptId() {
-        return id;
-    }
-
-    @Override
-    public void setListOfTasks(List<Task> tasks) {
-        this.listOfTasks=tasks;
-    }
-
-    @Override
-    public List<Task> getListOfTasks() {
-        return listOfTasks;
-    }
-
-
-    @Override
-    public void setEventame(String name) {
-        this.eventName=name;
-    }
-
-    @Override
-    public String getEventName() {
-        return eventName;
-    }
-
-
-    @Override
-    public void setID(int ID) {
-        this.id=ID;
-    }
-
-    @Override
-    public int getID() {
-        return id;
-    }
-
-    @Override
-    public void setNumberOfTasks(int tasks) {
-        this.tasks=tasks;
-    }
-
-    @Override
-    public int getNumberOfTasks() {
-        return tasks;
-    }
-
-    @Override
-    public void setStageName(String name) {
-        this.stageName=name;
-    }
-
-    @Override
-    public String getStageName() {
-        return stageName;
-    }
-
-    @Override
-    public void setStatus(String Status) {
-        this.status=status;
-    }
-
-    @Override
-    public String getStatus() {
-        return status;
-    }
-
-    @Override
-    public void setDetails(String details) {
-        this.details=details;
-    }
-
-    @Override
-    public String getDetails() {
-        return details;
-    }
-
-    @Override
-    public void setSubmissionTime(long time) {
-
-
-    }
-
-    @Override
-    public long getSubmissionTime() {
-        return 0;
-    }
-
-    @Override
-    public void setCompletionTime(long time) {
-            this.stageCompletionTime=time;
-    }
-
-    @Override
-    public long getCompletionTime() {
-        return stageCompletionTime;
-    }
-
-    @Override
-    public TaskMetric getTaskMetric() {
-        return taskMetric;
-    }
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/StageExecutorMetrics.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/StageExecutorMetrics.java
deleted file mode 100644
index 25497fb3..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/StageExecutorMetrics.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.metrics;
-
-import org.apache.wayang.spark.monitoring.interfaces.SerializableObject;
-import org.apache.wayang.spark.monitoring.interfaces.Stage;
-import org.apache.wayang.spark.monitoring.interfaces.Task;
-
-import java.util.List;
-/**
- * The StageExecutorMetrics class implements the Stage and SerializableObject 
interfaces and represents
- * the metrics associated with a stage executed by an executor.
- */
-
-public class StageExecutorMetrics implements Stage, SerializableObject {
-    private int id;
-    private int tasks;
-    private String stageName;
-    private String status;
-    private String details;
-    private String executorID;
-    private int stateAttempt;
-    private List<Task> listOfTasks;
-
-    public TaskMetric getTaskMetric() {
-        return taskMetric;
-    }
-
-    @Override
-    public void setTaskMetric(TaskMetric taskMetric) {
-        this.taskMetric =  taskMetric;
-    }
-
-
-    @Override
-    public void setExecutorID(String ID) {
-        this.executorID=ID;
-    }
-
-    @Override
-    public String getExecutorID() {
-        return executorID;
-    }
-
-    @Override
-    public void setStageAttemptId(int id) {
-        this.id=id;
-    }
-
-    @Override
-    public int getStageAttemptId() {
-        return id;
-    }
-
-    @Override
-    public void setListOfTasks(List<Task> tasks) {
-        this.listOfTasks=tasks;
-    }
-
-    @Override
-    public List<Task> getListOfTasks() {
-        return listOfTasks;
-    }
-
-    private TaskMetric taskMetric;
-
-    private String eventName;
-
-    @Override
-    public void setEventame(String name) {
-        this.eventName=name;
-    }
-
-    @Override
-    public String getEventName() {
-        return eventName;
-    }
-
-
-    @Override
-    public void setID(int ID) {
-        this.id=ID;
-    }
-
-    @Override
-    public int getID() {
-        return id;
-    }
-
-    @Override
-    public void setNumberOfTasks(int tasks) {
-        this.tasks=tasks;
-    }
-
-    @Override
-    public int getNumberOfTasks() {
-        return tasks;
-    }
-
-    @Override
-    public void setStageName(String name) {
-        this.stageName=name;
-    }
-
-    @Override
-    public String getStageName() {
-        return stageName;
-    }
-
-    @Override
-    public void setStatus(String Status) {
-        this.status=status;
-    }
-
-    @Override
-    public String getStatus() {
-        return status;
-    }
-
-    @Override
-    public void setDetails(String details) {
-        this.details=details;
-    }
-
-    @Override
-    public String getDetails() {
-        return details;
-    }
-
-    @Override
-    public void setSubmissionTime(long time) {
-
-    }
-
-    @Override
-    public long getSubmissionTime() {
-        return 0;
-    }
-
-    @Override
-    public void setCompletionTime(long time) {
-
-    }
-
-    @Override
-    public long getCompletionTime() {
-        return 0;
-    }
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/StageSubmitted.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/StageSubmitted.java
deleted file mode 100644
index 4729e77c..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/StageSubmitted.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.metrics;
-
-import org.apache.wayang.spark.monitoring.interfaces.SerializableObject;
-import org.apache.wayang.spark.monitoring.interfaces.Stage;
-import org.apache.wayang.spark.monitoring.interfaces.Task;
-
-import java.util.List;
-/**
-
- * The StageSubmitted class implements the Stage and SerializableObject 
interfaces to represent a submitted stage in a distributed system.
- * It contains information about the stage's ID, number of tasks, name, 
status, details, executor ID, list of tasks, stage attempt ID, stage submission 
time,
- * and task metric. It also allows the setting and getting of each of these 
properties through various interface methods.
- */
-public class StageSubmitted implements Stage, SerializableObject {
-    private int id;
-    private int tasks;
-    private String stageName;
-    private String status;
-    private String details;
-    private String executorID;
-    private List<Task> listOfTasks;
-    private int stageAttemptID;
-    private long stageSubmissionTime;
-    public TaskMetric getTaskMetric() {
-        return taskMetric;
-    }
-
-    public void setTaskMetric(TaskMetric taskMetric) {
-        this.taskMetric = taskMetric;
-    }
-
-    @Override
-    public void setExecutorID(String ID) {
-        this.executorID=ID;
-    }
-
-    @Override
-    public String getExecutorID() {
-        return executorID;
-    }
-
-    @Override
-    public void setStageAttemptId(int id) {
-        this.stageAttemptID=id;
-    }
-
-    @Override
-    public int getStageAttemptId() {
-        return id;
-    }
-
-    @Override
-    public void setListOfTasks(List<Task> tasks) {
-        this.listOfTasks= tasks;
-    }
-
-    @Override
-    public List<Task> getListOfTasks() {
-        return listOfTasks;
-    }
-
-    private TaskMetric taskMetric;
-    private String eventName;
-
-    @Override
-    public void setEventame(String name) {
-        this.eventName=name;
-    }
-
-    @Override
-    public String getEventName() {
-        return eventName;
-    }
-
-
-    @Override
-    public void setID(int ID) {
-        this.id=ID;
-    }
-
-    @Override
-    public int getID() {
-        return id;
-    }
-
-    @Override
-    public void setNumberOfTasks(int tasks) {
-        this.tasks=tasks;
-    }
-
-    @Override
-    public int getNumberOfTasks() {
-        return tasks;
-    }
-
-    @Override
-    public void setStageName(String name) {
-        this.stageName=name;
-    }
-
-    @Override
-    public String getStageName() {
-        return stageName;
-    }
-
-    @Override
-    public void setStatus(String Status) {
-        this.status=status;
-    }
-
-    @Override
-    public String getStatus() {
-        return status;
-    }
-
-    @Override
-    public void setDetails(String details) {
-        this.details=details;
-    }
-
-    @Override
-    public String getDetails() {
-        return details;
-    }
-
-    @Override
-    public void setSubmissionTime(long time) {
-        this.stageSubmissionTime=time;
-    }
-
-    @Override
-    public long getSubmissionTime() {
-        return this.stageSubmissionTime;
-    }
-
-    @Override
-    public void setCompletionTime(long time) {
-
-    }
-
-    @Override
-    public long getCompletionTime() {
-        return 0;
-    }
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskEnd.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskEnd.java
deleted file mode 100644
index 7dc3e281..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskEnd.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.metrics;
-
-import org.apache.wayang.spark.monitoring.interfaces.SerializableObject;
-import org.apache.wayang.spark.monitoring.interfaces.Task;
-/**
- * Represents the end status of a task execution.
- *
- * <p>Implementing the {@link Task} interface, this class provides methods to 
set and get the
- * attributes of a task, such as its ID, host IP, launch and finish times, and 
status. It also
- * implements the {@link SerializableObject} interface to allow serialization 
of the class.
- *
- * <p>The class also includes a {@link TaskMetric} object to store metrics 
related to the task,
- * and a {@link TaskStatusForRunning} object to provide information about the 
task's status during
- * execution.
- *
- * @author [Adeel Aslam]
- */
-public class TaskEnd implements Task, SerializableObject {
-    private String id;
-    private String hostIP;
-    private long taskId;
-    private String executorID;
-    private String taskStatus;
-    private int Index;
-    private int partition;
-    private long launchTime;
-    private long durationTime;
-    private long finishTime;
-    private long gettingResultTime;
-    private boolean status;
-    private int stageID;
-
-    private String eventName;
-    TaskMetric taskMetric;
-
-    TaskStatusForRunning taskStatusForRunning=null;
-
-    @Override
-    public void setEventame(String name) {
-        this.eventName=name;
-    }
-
-    @Override
-    public String getEventName() {
-        return eventName;
-    }
-
-
-    @Override
-    public void setID(String id) {
-        this.id=id;
-    }
-
-    @Override
-    public String getID() {
-        return id;
-    }
-    @Override
-    public void setHostIP(String Ip) {
-        this.hostIP=Ip;
-    }
-
-    @Override
-    public String getHostIP() {
-        return hostIP;
-    }
-
-    @Override
-    public void setTaskID(long taskId) {
-        this.taskId=taskId;
-    }
-
-    @Override
-    public void setStageID(int id) {
-        this.stageID=id;
-    }
-
-    @Override
-    public int getStageID() {
-        return stageID;
-    }
-
-    @Override
-    public long getTaskID() {
-        return taskId;
-    }
-
-    @Override
-    public void setStringExecutorID(String executorID) {
-        this.executorID=executorID;
-    }
-
-    @Override
-    public String getExecutorID() {
-        return executorID;
-    }
-
-    @Override
-    public void setTaskStatus(String status) {
-        this.taskStatus=status;
-    }
-
-    @Override
-    public String getTaskStatus() {
-        return taskStatus;
-    }
-
-    @Override
-    public void setIndex(int index) {
-        this.Index= index;
-
-    }
-
-    @Override
-    public int getIndex() {
-        return Index;
-    }
-
-    @Override
-    public void setPartition(int partition) {
-        this.partition=partition;
-    }
-
-    @Override
-    public int getPartition() {
-        return partition;
-    }
-
-    @Override
-    public void setLaunchTime(long time) {
-        this.launchTime=time;
-    }
-
-    @Override
-    public long getLaunchTime() {
-        return launchTime;
-    }
-
-    @Override
-    public void setFinishTime(long time) {
-        this.finishTime=time;
-    }
-
-    @Override
-    public long getFinishTime() {
-        return finishTime;
-    }
-
-    @Override
-    public void setGettingTime(long time) {
-        this.gettingResultTime=time;
-    }
-
-    @Override
-    public long getGettingTime() {
-        return gettingResultTime;
-    }
-
-    @Override
-    public void setDurationTime(long time) {
-        this.durationTime=time;
-    }
-
-    @Override
-    public long getDurationTime() {
-        return durationTime;
-    }
-
-
-    @Override
-    public void setTaskStatus(boolean status) {
-        this.status=status;
-    }
-
-    @Override
-    public boolean getTaskSatus() {
-        return status;
-    }
-
-    @Override
-    public void setTaskStatusForRunning(TaskStatusForRunning 
taskStatusForRunning) {
-        this.taskStatusForRunning=taskStatusForRunning;
-    }
-
-    @Override
-    public TaskStatusForRunning getTaskStatusForRunning() {
-        return taskStatusForRunning;
-    }
-
-    @Override
-    public TaskMetric getTaskMetric() {
-        return taskMetric;
-    }
-
-    @Override
-    public void setTaskMetric(TaskMetric taskMetric) {
-        this.taskMetric=taskMetric;
-
-    }
-
-
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskGettingResult.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskGettingResult.java
deleted file mode 100644
index 2318e2c0..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskGettingResult.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.metrics;
-
-import org.apache.wayang.spark.monitoring.interfaces.SerializableObject;
-import org.apache.wayang.spark.monitoring.interfaces.Task;
-/**
- * Represents the getting results of a task execution.
- *
- * <p>Implementing the {@link Task} interface, this class provides methods to 
set and get the
- * attributes of a task, such as its ID, host IP, launch and finish times, and 
status. It also
- * implements the {@link SerializableObject} interface to allow serialization 
of the class.
- *
- * <p>The class also includes a {@link TaskMetric} object to store metrics 
related to the task,
- * and a {@link TaskStatusForRunning} object to provide information about the 
task's status during
- * execution.
- *
- * @author [Adeel Aslam]
- */
-public class TaskGettingResult implements Task, SerializableObject {
-    private String id;
-    private String hostIP;
-    private long taskId;
-    private String executorID;
-    private String taskStatus;
-    private int Index;
-    private int partition;
-    private long launchTime;
-    private long durationTime;
-    private long finishTime;
-    private long gettingResultTime;
-    private boolean status;
-    private int stageID;
-
-    private String eventName;
-
-    TaskStatusForRunning taskStatusForRunning=null;
-
-    @Override
-    public void setEventame(String name) {
-        this.eventName=name;
-    }
-
-    @Override
-    public String getEventName() {
-        return eventName;
-    }
-
-
-    @Override
-    public void setID(String id) {
-        this.id=id;
-    }
-
-    @Override
-    public String getID() {
-        return id;
-    }
-    @Override
-    public void setHostIP(String Ip) {
-        this.hostIP=Ip;
-    }
-
-    @Override
-    public String getHostIP() {
-        return hostIP;
-    }
-
-    @Override
-    public void setTaskID(long taskId) {
-        this.taskId=taskId;
-    }
-
-    @Override
-    public void setStageID(int id) {
-        this.stageID=id;
-    }
-
-    @Override
-    public int getStageID() {
-        return stageID;
-    }
-
-    @Override
-    public long getTaskID() {
-        return taskId;
-    }
-
-    @Override
-    public void setStringExecutorID(String executorID) {
-        this.executorID=executorID;
-    }
-
-    @Override
-    public String getExecutorID() {
-        return executorID;
-    }
-
-    @Override
-    public void setTaskStatus(String status) {
-        this.taskStatus=status;
-    }
-
-    @Override
-    public String getTaskStatus() {
-        return taskStatus;
-    }
-
-    @Override
-    public void setIndex(int index) {
-        this.Index= index;
-
-    }
-
-    @Override
-    public int getIndex() {
-        return Index;
-    }
-
-    @Override
-    public void setPartition(int partition) {
-        this.partition=partition;
-    }
-
-    @Override
-    public int getPartition() {
-        return partition;
-    }
-
-    @Override
-    public void setLaunchTime(long time) {
-        this.launchTime=time;
-    }
-
-    @Override
-    public long getLaunchTime() {
-        return launchTime;
-    }
-
-    @Override
-    public void setFinishTime(long time) {
-        this.finishTime=time;
-    }
-
-    @Override
-    public long getFinishTime() {
-        return finishTime;
-    }
-
-    @Override
-    public void setGettingTime(long time) {
-        this.gettingResultTime=time;
-    }
-
-    @Override
-    public long getGettingTime() {
-        return gettingResultTime;
-    }
-
-    @Override
-    public void setDurationTime(long time) {
-        this.durationTime=time;
-    }
-
-    @Override
-    public long getDurationTime() {
-        return durationTime;
-    }
-
-
-    @Override
-    public void setTaskStatus(boolean status) {
-        this.status=status;
-    }
-
-    @Override
-    public boolean getTaskSatus() {
-        return status;
-    }
-
-    @Override
-    public void setTaskStatusForRunning(TaskStatusForRunning 
taskStatusForRunning) {
-        this.taskStatusForRunning=taskStatusForRunning;
-    }
-
-    @Override
-    public TaskStatusForRunning getTaskStatusForRunning() {
-        return taskStatusForRunning;
-    }
-
-    @Override
-    public TaskMetric getTaskMetric() {
-        return null;
-    }
-
-    @Override
-    public void setTaskMetric(TaskMetric taskMetric) {
-
-    }
-
-
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskMetric.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskMetric.java
deleted file mode 100644
index 5ce05314..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskMetric.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.metrics;
-
-import org.apache.wayang.spark.monitoring.interfaces.SerializableObject;
-/**
-
- This class represents the metrics for a task in the Apache Wayang monitoring 
system.
-
- It implements the TaskMetric interface and SerializableObject interface to 
allow for serialization.
- */
-public class TaskMetric  implements 
org.apache.wayang.spark.monitoring.interfaces.TaskMetric, SerializableObject {
-    public long getExecutorCPUTime() {
-        return executorCPUTime;
-    }
-
-    public void setExecutorCPUTime(long executorCPUTime) {
-        this.executorCPUTime = executorCPUTime;
-    }
-
-    private long executorCPUTime;
-    private long bytesRead;
-    private long executorDeserializeCpuTime;
-    private  long executorDeserializeTime;
-    private long DiskByteSpilled;
-    private long time;
-    private long JVMGCTime;
-    private long peakExecutionMemory;
-    private long resultSize;
-    private long resultSerializationTime;
-    private long recordsWritten;
-    private  long bytesWritten;
-    @Override
-    public void setBytesRead(long bytesRead) {
-        this.bytesRead=bytesRead;
-    }
-
-    @Override
-    public long getByteRead() {
-        return bytesRead;
-    }
-
-    @Override
-    public void setExecutorDeserializeCpuTime(long executorDeserializeCpuTime) 
{
-        this.executorDeserializeCpuTime=executorDeserializeCpuTime;
-    }
-
-    @Override
-    public long getExecutorDeserializeCpuTime() {
-        return executorDeserializeCpuTime;
-    }
-
-    @Override
-    public void setExecutorDeserializeTime(long executorDeserializeTime) {
-        this.executorDeserializeTime=executorDeserializeTime;
-    }
-
-    @Override
-    public long getExecutorDeserializeTime() {
-        return executorDeserializeTime;
-    }
-
-    @Override
-    public void setDiskBytesSpilled(long DiskByteSpilled) {
-        this. DiskByteSpilled=DiskByteSpilled;
-    }
-
-    @Override
-    public long getDiskBytesSpilled() {
-        return DiskByteSpilled;
-    }
-
-    @Override
-    public void setExecutorRunTime(long time) {
-        this.time=time;
-    }
-
-    @Override
-    public long getexecutorRunTime() {
-        return time;
-    }
-
-    @Override
-    public void setjvmGCTime(long time) {
-        this.JVMGCTime=time;
-    }
-
-    @Override
-    public long getJVMGCTime() {
-        return JVMGCTime;
-    }
-
-    @Override
-    public void setPeakExecutionMemory(long peakExecutionMemory) {
-        this. peakExecutionMemory=peakExecutionMemory;
-    }
-
-    @Override
-    public long getPeakExecutionMemory() {
-        return peakExecutionMemory;
-    }
-
-    @Override
-    public void setResultSize(long resultSize) {
-        this. resultSize=resultSize;
-    }
-
-    @Override
-    public long getResultSize() {
-        return resultSize;
-    }
-
-    @Override
-    public void setResultSerializationTime(long resultSerializationTime) {
-        this. resultSerializationTime=resultSerializationTime;
-    }
-
-    @Override
-    public long getResultSerializationTime() {
-        return resultSerializationTime;
-    }
-
-    @Override
-    public void setRecordsWritten(long recordsWritten) {
-        this.recordsWritten= recordsWritten;
-    }
-
-    @Override
-    public long getRecordsWrittern() {
-        return recordsWritten;
-    }
-
-    @Override
-    public void setBytesWritten(long bytesWritten) {
-        this.bytesWritten=bytesWritten;
-    }
-
-    @Override
-    public long getBytesWrittern() {
-        return bytesWritten;
-    }
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskStart.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskStart.java
deleted file mode 100644
index f4907cc0..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/monitoring/metrics/TaskStart.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.spark.monitoring.metrics;
-import org.apache.wayang.spark.monitoring.interfaces.SerializableObject;
-import org.apache.wayang.spark.monitoring.interfaces.Task;
-/**
- * Represents the start status of a task execution.
- *
- * <p>Implementing the {@link Task} interface, this class provides methods to 
set and get the
- * attributes of a task, such as its ID, host IP, launch and finish times, and 
status. It also
- * implements the {@link SerializableObject} interface to allow serialization 
of the class.
- *
- * <p>The class also includes a {@link TaskMetric} object to store metrics 
related to the task,
- * and a {@link TaskStatusForRunning} object to provide information about the 
task's status during
- * execution.
- *
- * @author [Adeel Aslam]
- */
-public class TaskStart implements Task, SerializableObject {
-    private String id;
-    private String hostIP;
-    private long taskId;
-    private String executorID;
-    private String taskStatus;
-    private int Index;
-    private int partition;
-    private long launchTime;
-    private long durationTime;
-    private long finishTime;
-    private long gettingResultTime;
-    private boolean status;
-    private int stageID;
-
-    private String eventName;
-
-    TaskStatusForRunning taskStatusForRunning=null;
-
-    @Override
-    public void setEventame(String name) {
-        this.eventName=name;
-    }
-
-    @Override
-    public String getEventName() {
-        return eventName;
-    }
-
-
-    @Override
-    public void setID(String id) {
-        this.id=id;
-    }
-
-    @Override
-    public String getID() {
-        return id;
-    }
-    @Override
-    public void setHostIP(String Ip) {
-        this.hostIP=Ip;
-    }
-
-    @Override
-    public String getHostIP() {
-        return hostIP;
-    }
-
-    @Override
-    public void setTaskID(long taskId) {
-        this.taskId=taskId;
-    }
-
-    @Override
-    public void setStageID(int id) {
-        this.stageID=id;
-    }
-
-    @Override
-    public int getStageID() {
-        return stageID;
-    }
-
-    @Override
-    public long getTaskID() {
-        return taskId;
-    }
-
-    @Override
-    public void setStringExecutorID(String executorID) {
-        this.executorID=executorID;
-    }
-
-    @Override
-    public String getExecutorID() {
-        return executorID;
-    }
-
-    @Override
-    public void setTaskStatus(String status) {
-        this.taskStatus=status;
-    }
-
-    @Override
-    public String getTaskStatus() {
-        return taskStatus;
-    }
-
-    @Override
-    public void setIndex(int index) {
-        this.Index= index;
-
-    }
-
-    @Override
-    public int getIndex() {
-        return Index;
-    }
-
-    @Override
-    public void setPartition(int partition) {
-        this.partition=partition;
-    }
-
-    @Override
-    public int getPartition() {
-        return partition;
-    }
-
-    @Override
-    public void setLaunchTime(long time) {
-        this.launchTime=time;
-    }
-
-    @Override
-    public long getLaunchTime() {
-        return launchTime;
-    }
-
-    @Override
-    public void setFinishTime(long time) {
-        this.finishTime=time;
-    }
-
-    @Override
-    public long getFinishTime() {
-        return finishTime;
-    }
-
-    @Override
-    public void setGettingTime(long time) {
-        this.gettingResultTime=time;
-    }
-
-    @Override
-    public long getGettingTime() {
-        return gettingResultTime;
-    }
-
-    @Override
-    public void setDurationTime(long time) {
-        this.durationTime=time;
-    }
-
-    @Override
-    public long getDurationTime() {
-        return durationTime;
-    }
-
-
-    @Override
-    public void setTaskStatus(boolean status) {
-        this.status=status;
-    }
-
-    @Override
-    public boolean getTaskSatus() {
-        return status;
-    }
-
-    @Override
-    public void setTaskStatusForRunning(TaskStatusForRunning 
taskStatusForRunning) {
-        this.taskStatusForRunning=taskStatusForRunning;
-    }
-
-    @Override
-    public TaskStatusForRunning getTaskStatusForRunning() {
-        return taskStatusForRunning;
-    }
-
-    @Override
-    public TaskMetric getTaskMetric() {
-        return null;
-    }
-
-    @Override
-    public void setTaskMetric(TaskMetric taskMetric) {
-
-    }
-
-
-}
diff --git 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
index c2141ee0..471275ab 100644
--- 
a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
+++ 
b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
@@ -33,7 +33,6 @@ import org.apache.wayang.core.platform.Platform;
 import org.apache.wayang.core.types.DataSetType;
 import org.apache.wayang.core.util.Formats;
 import org.apache.wayang.core.util.ReflectionUtils;
-import org.apache.wayang.spark.monitoring.metrics.SparkListener;
 import org.apache.wayang.spark.execution.SparkContextReference;
 import org.apache.wayang.spark.execution.SparkExecutor;
 import org.apache.wayang.spark.operators.SparkCollectionSource;
@@ -54,7 +53,6 @@ public class SparkPlatform extends Platform {
     private static final String CONFIG_NAME = "spark";
 
     private static final String DEFAULT_CONFIG_FILE = 
"wayang-spark-defaults.properties";
-    private static final String 
MONITORING_METRICS_FILE="wayang-spark-metrics.properties";
 
     public static final String INITIALIZATION_MS_CONFIG_KEY = 
"wayang.spark.init.ms";
 
@@ -85,8 +83,12 @@ public class SparkPlatform extends Platform {
             "spark.io.compression.codec",
             "spark.driver.memory",
             "spark.executor.heartbeatInterval",
-            "spark.network.timeout"
-            // "spark.extraListeners"
+            "spark.network.timeout",
+    };
+
+    private static final String[] OPTIONAL_HADOOP_PROPERTIES = {
+        "fs.s3.awsAccessKeyId",
+        "fs.s3.awsSecretAccessKey"
     };
 
     /**
@@ -114,12 +116,10 @@ public class SparkPlatform extends Platform {
      * @return a {@link SparkContextReference} wrapping the {@link 
JavaSparkContext}
      */
     public SparkContextReference getSparkContext(Job job) {
-        //System.out.println("In Job with "+job.isMontiorWithHackIT());
-        //System.exit(0);
+
         // NB: There must be only one JavaSparkContext per JVM. Therefore, it 
is not local to the executor.
         final SparkConf sparkConf;
         final Configuration configuration = job.getConfiguration();
-
         if (this.sparkContextReference != null && 
!this.sparkContextReference.isDisposed()) {
             final JavaSparkContext sparkContext = 
this.sparkContextReference.get();
             this.logger.warn(
@@ -127,8 +127,6 @@ public class SparkPlatform extends Platform {
                             "Not all settings might be effective.", 
sparkContext.getConf().get("spark.master"));
             sparkConf = sparkContext.getConf();
 
-
-
         } else {
             sparkConf = new SparkConf(true);
         }
@@ -145,17 +143,22 @@ public class SparkPlatform extends Platform {
         if (job.getName() != null) {
             sparkConf.set("spark.app.name", job.getName());
         }
-        // 
sparkConf.set("spark.extraListeners","org.apache.wayang.monitoring.spark.SparkListener");
+
         if (this.sparkContextReference == null || 
this.sparkContextReference.isDisposed()) {
             this.sparkContextReference = new 
SparkContextReference(job.getCrossPlatformExecutor(), new 
JavaSparkContext(sparkConf));
         }
         final JavaSparkContext sparkContext = this.sparkContextReference.get();
 
-        //SparkContext sc= sparkContext.sc();
-        if(job.isMontiorWithHackIT()) {
-            
sparkConf.set("spark.extraListeners","org.apache.wayang.spark.monitoring.spark_monitoring.SparkListener");
-            sparkContext.sc().addSparkListener(new SparkListener());
+        org.apache.hadoop.conf.Configuration hadoopconf = 
sparkContext.hadoopConfiguration();
+        for (String property: OPTIONAL_HADOOP_PROPERTIES){
+            System.out.println(property);
+            configuration.getOptionalStringProperty(property).ifPresent(
+                value -> hadoopconf.set(property, value)
+            );
         }
+
+        // Set up the JAR files.
+        //sparkContext.clearJars();
         if (!sparkContext.isLocal()) {
             // Add Wayang JAR files.
             
this.registerJarIfNotNull(ReflectionUtils.getDeclaringJar(SparkPlatform.class));
 // wayang-spark
diff --git 
a/wayang-platforms/wayang-spark/code/main/resources/wayang-spark-kafka.properties
 
b/wayang-platforms/wayang-spark/code/main/resources/wayang-spark-kafka.properties
deleted file mode 100644
index 7c66a686..00000000
--- 
a/wayang-platforms/wayang-spark/code/main/resources/wayang-spark-kafka.properties
+++ /dev/null
@@ -1,42 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Kafka producer configuration properties
-
-# Set the broker address and port number(s)
-bootstrap.servers = localhost:9092
-
-# Set the number of acknowledgements the producer requires
-acks = all
-
-# Set the number of times to retry sending a message
-retries=0
-
-# Set the number of bytes to include in each batch of messages
-batch.size=16384
-
-# Set the amount of time (in milliseconds) the producer will wait before 
sending a batch of messages
-linger.ms=1
-
-# Set the amount of memory (in bytes) the producer can use to buffer messages
-buffer.memory=33554432
-
-# Set the serializer classes for key and value
-key.serializer=org.apache.kafka.common.serialization.StringSerializer
-value.serializer=org.apache.kafka.common.serialization.StringSerializer
-
-# Set the Kafka topic to produce messages to
-kafka.topic=Topic
diff --git a/wayang-platforms/wayang-spark/pom.xml 
b/wayang-platforms/wayang-spark/pom.xml
index 4a3f8077..1d2205ff 100644
--- a/wayang-platforms/wayang-spark/pom.xml
+++ b/wayang-platforms/wayang-spark/pom.xml
@@ -19,9 +19,9 @@
   -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
+  <packaging>pom</packaging>
 
-    <parent>
+  <parent>
         <artifactId>wayang-platforms</artifactId>
         <groupId>org.apache.wayang</groupId>
         <version>0.6.1-SNAPSHOT</version>
@@ -66,33 +66,38 @@
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
-            <version>3.2.3</version>
+            <version>3.2.4</version>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-client</artifactId>
-            <version>2.7.7</version>
+            <version>3.1.2</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-            <version>3.4.0</version>
+          <groupId>com.amazonaws</groupId>
+          <artifactId>aws-java-sdk-s3</artifactId>
+          <version>1.12.261</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-aws</artifactId>
+          <version>3.1.2</version>
         </dependency>
     </dependencies>
 
-    <modules>
-        <module>wayang-spark_2.12</module>
-    </modules>
+  <modules>
+    <module>wayang-spark_2.12</module>
+  </modules>
 
-    <profiles>
-        <profile>
-            <id>java8</id>
-            <activation>
-                <jdk>1.8</jdk>
-            </activation>
-            <modules>
-                <module>wayang-spark_2.11</module>
-            </modules>
-        </profile>
-    </profiles>
+  <profiles>
+    <profile>
+      <id>java8</id>
+      <activation>
+        <jdk>1.8</jdk>
+      </activation>
+      <modules>
+        <module>wayang-spark_2.11</module>
+      </modules>
+    </profile>
+  </profiles>
 </project>

Reply via email to