This is an automated email from the ASF dual-hosted git repository.

rpardomeza pushed a commit to branch WAYANG-agoraeo
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 08966741184bbac5cd1a279f3dbc6bac4b4677f8
Author: rodrigopardomeza <[email protected]>
AuthorDate: Thu Feb 2 09:48:58 2023 +0100

    Read info from bands
---
 wayang-commons/wayang-basic/pom.xml                |   1 +
 wayang-plugins/wayang-agoraeo/pom.xml              |  15 ++
 .../main/java/org/apache/wayang/agoraeo/Main.java  |  99 ++-------
 .../org/apache/wayang/agoraeo/MakePatches.java     | 236 +++++++++++++++++++++
 .../iterators/IteratorSentinelDownload.java        |  12 +-
 .../spark/{Pepito.java => RunSen2Cor.java}         |  20 +-
 .../operators/spark/SparkSen2CorWrapper.java       |  18 +-
 .../org/apache/wayang/agoraeo/patches/Band.java    |  85 ++++++++
 .../wayang/agoraeo/patches/BandMetadata.java       |  17 ++
 .../apache/wayang/agoraeo/patches/L2a_file.java    |  45 ++++
 10 files changed, 439 insertions(+), 109 deletions(-)

diff --git a/wayang-commons/wayang-basic/pom.xml 
b/wayang-commons/wayang-basic/pom.xml
index 238cd311..e5deea33 100644
--- a/wayang-commons/wayang-basic/pom.xml
+++ b/wayang-commons/wayang-basic/pom.xml
@@ -61,6 +61,7 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>
+
     </dependencies>
 
 </project>
diff --git a/wayang-plugins/wayang-agoraeo/pom.xml 
b/wayang-plugins/wayang-agoraeo/pom.xml
index 0c21da01..3f282abe 100644
--- a/wayang-plugins/wayang-agoraeo/pom.xml
+++ b/wayang-plugins/wayang-agoraeo/pom.xml
@@ -43,6 +43,21 @@
             <artifactId>hadoop-client</artifactId>
             <version>3.1.2</version>
         </dependency>
+        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.13</artifactId>
+            <version>3.3.0</version>
+            <scope>provided</scope>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/org.gdal/gdal -->
+        <dependency>
+            <groupId>org.gdal</groupId>
+            <artifactId>gdal</artifactId>
+            <version>3.6.0</version>
+            <type>pom</type>
+        </dependency>
+
     </dependencies>
 
     <properties>
diff --git 
a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/Main.java
 
b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/Main.java
index 5c7a27a6..87cf1c82 100644
--- 
a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/Main.java
+++ 
b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/Main.java
@@ -1,5 +1,7 @@
 package org.apache.wayang.agoraeo;
 
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.Arrays;
 import java.util.List;
 
@@ -21,9 +23,8 @@ public class Main {
 
         WayangContext wayangContext = new WayangContext();
         wayangContext.register(Java.basicPlugin());
-//        wayangContext.register(WayangAgoraEO.javaPlugin());
-        wayangContext.register(WayangAgoraEO.plugin());
         wayangContext.register(Spark.basicPlugin());
+        wayangContext.register(WayangAgoraEO.plugin());
 
         Configuration config = wayangContext.getConfiguration();
         
config.load(ReflectionUtils.loadResource(WayangAgoraEO.DEFAULT_CONFIG_FILE));
@@ -33,17 +34,23 @@ public class Main {
 
         System.out.println("Running AgoraEO!");
 
+        // TODO: Read all from config file
         Mirror m1 = new Mirror("https://scihub.copernicus.eu/dhus";, 
"rpardomeza", "12c124ccb2");
-        Mirror m2 = new Mirror("https://sentinels.space.noa.gr/dhus";, 
"greecerpardomeza", "12c124ccb2");
+        Mirror m2 = new Mirror("https://colhub.met.no";, "rpardomeza", 
"12c124ccb2");
 
         List<Mirror> mirrors = Arrays.asList(m1,m2);
 
-        // deberia ser un hashmap, con valores lista de orders con flatmap
-//        String order = "--from NOW-30DAY --to NOW --order 33UUU,33UWP";
-//
-        WayangPlan w = alternative2WayangPlan(mirrors, sen2cor, 
l2a_images_folder, 
"file:///Users/rodrigopardomeza/files/sen2cor-output-agoraeo.txt");
+        DateTimeFormatter dtf = 
DateTimeFormatter.ofPattern("yyyyMMdd-HH:mm:ss");
+        LocalDateTime now = LocalDateTime.now();
+        String sen2corlog = now.format(dtf);
+
+        WayangPlan w = alternative2WayangPlan(
+                mirrors,
+                sen2cor,
+                l2a_images_folder,
+                
"file:///Users/rodrigopardomeza/tu-berlin/agoraeo/agoraeo/sen2cor_logs/"+sen2corlog
+        );
 
-//        wayangContext.execute(w, 
ReflectionUtils.getDeclaringJar(Main.class), 
ReflectionUtils.getDeclaringJar(JavaPlatform.class));
         wayangContext.execute(w, ReflectionUtils.getDeclaringJar(Main.class), 
ReflectionUtils.getDeclaringJar(JavaPlatform.class), 
ReflectionUtils.getDeclaringJar(SparkPlatform.class));
 
     }
@@ -56,9 +63,9 @@ public class Main {
     ) {
 
         SentinelSource source = new SentinelSource()
-            .setFrom("NOW-90DAY")
+            .setFrom("NOW-30DAY")
             .setTo("NOW")
-            .setOrder(Arrays.asList("33UUU", "33UWP"))
+            .setOrder(Arrays.asList("33UWP", "32VNM"))
             .setMirrors(mirrors)
         ;
 
@@ -75,76 +82,4 @@ public class Main {
         return new WayangPlan(sink);
     }
 
-
-
-
-//    public static WayangPlan createWayangPlan(
-//            String cmd,
-//            String outputFileUrl
-//    ) {
-//
-//        IteratorSentinelDownload<File> iter = new 
FileIteratorSentinelDownload("Sentinel 2 - API", cmd);
-//
-//        /* Might replay the name of the downloaded file */
-//        SentinelSource<File> source = new SentinelSource<>(iter, File.class);
-//
-//        FlatMapOperator<File, String> files_lines = new FlatMapOperator<>(
-//                t -> {
-//                    try {
-//                        final InputStream inputStream = 
Files.newInputStream(t.toPath());
-//                        Stream<String> rough_lines = new BufferedReader(new 
InputStreamReader(inputStream)).lines();
-//                        return rough_lines::iterator;
-//                    } catch (IOException e) {
-//                        throw new WayangException(String.format("Reading %s 
failed.", t), e);
-//                    }
-//                },
-//                File.class,
-//                String.class
-//        );
-//        files_lines.setName("files giving lines");
-//
-//        FlatMapOperator<String, String> words = new FlatMapOperator<>(
-//                line -> Arrays.asList(line.split("\\W+")),
-//                String.class,
-//                String.class
-//        );
-//        words.setName("words");
-//
-//        // for each word transform it to lowercase and output a key-value 
pair (word, 1)
-//        MapOperator<String, Tuple2<String, Integer>> mapOperator = new 
MapOperator<>(
-//                new TransformationDescriptor<>(word -> new 
Tuple2<>(word.toLowerCase(), 1),
-//                        DataUnitType.createBasic(String.class),
-//                        DataUnitType.createBasicUnchecked(Tuple2.class)
-//                ), DataSetType.createDefault(String.class),
-//                DataSetType.createDefaultUnchecked(Tuple2.class)
-//        );
-//        mapOperator.setName("To lower case, add counter");
-//
-//
-//        // groupby the key (word) and add up the values (frequency)
-//        ReduceByOperator<Tuple2<String, Integer>, String> reduceByOperator = 
new ReduceByOperator<>(
-//                new TransformationDescriptor<>(pair -> pair.field0,
-//                        DataUnitType.createBasicUnchecked(Tuple2.class),
-//                        DataUnitType.createBasic(String.class)), new 
ReduceDescriptor<>(
-//                ((a, b) -> {
-//                    a.field1 += b.field1;
-//                    return a;
-//                }), DataUnitType.createGroupedUnchecked(Tuple2.class),
-//                DataUnitType.createBasicUnchecked(Tuple2.class)
-//        ), DataSetType.createDefaultUnchecked(Tuple2.class)
-//        );
-//        reduceByOperator.setName("Add counters");
-//
-//        TextFileSink<Tuple2> sink = new TextFileSink<>(outputFileUrl, 
Tuple2.class);
-//        sink.setName("Collect result");
-//
-//
-//        source.connectTo(0, files_lines, 0);
-//        files_lines.connectTo(0, words,0);
-//        words.connectTo(0, mapOperator, 0);
-//        mapOperator.connectTo(0, reduceByOperator, 0);
-//        reduceByOperator.connectTo(0, sink, 0);
-//
-//        return new WayangPlan(sink);
-//    }
 }
\ No newline at end of file
diff --git 
a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/MakePatches.java
 
b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/MakePatches.java
new file mode 100644
index 00000000..5e8dbe0b
--- /dev/null
+++ 
b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/MakePatches.java
@@ -0,0 +1,236 @@
+package org.apache.wayang.agoraeo;
+
+import org.apache.wayang.agoraeo.patches.Band;
+import org.apache.wayang.agoraeo.patches.BandMetadata;
+import org.apache.wayang.agoraeo.patches.L2a_file;
+import org.apache.wayang.basic.operators.CollectionSource;
+import org.apache.wayang.basic.operators.FlatMapOperator;
+import org.apache.wayang.basic.operators.LocalCallbackSink;
+import org.apache.wayang.basic.operators.MapOperator;
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.plan.wayangplan.WayangPlan;
+import org.apache.wayang.core.util.ReflectionUtils;
+import org.apache.wayang.java.Java;
+import org.apache.wayang.java.platform.JavaPlatform;
+import org.apache.wayang.spark.Spark;
+import org.apache.wayang.spark.platform.SparkPlatform;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class MakePatches {
+
+    private static Map<String, String> resolutionMap;
+
+    public static void main(String[] args) {
+
+        resolutionMap = new HashMap<>();
+        resolutionMap.put("B01", "60");
+        resolutionMap.put("B02", "10");
+        resolutionMap.put("B03", "10");
+        resolutionMap.put("B04", "10");
+        resolutionMap.put("B05", "20");
+        resolutionMap.put("B06", "20");
+        resolutionMap.put("B07", "20");
+        resolutionMap.put("B08", "10");
+        resolutionMap.put("B8A", "20");
+        resolutionMap.put("B09", "60");
+        resolutionMap.put("B10", "60");
+        resolutionMap.put("B11", "20");
+        resolutionMap.put("B12", "20");
+        resolutionMap.put("TCI", "20");
+        resolutionMap.put("AOT", "20");
+        resolutionMap.put("WVP", "20");
+        resolutionMap.put("SCL", "60");
+
+        WayangContext wayangContext = new WayangContext();
+        wayangContext.register(Java.basicPlugin());
+        wayangContext.register(Spark.basicPlugin());
+        wayangContext.register(WayangAgoraEO.plugin());
+
+        Configuration config = wayangContext.getConfiguration();
+        
config.load(ReflectionUtils.loadResource(WayangAgoraEO.DEFAULT_CONFIG_FILE));
+
+        System.out.println("Running Patch Making Process!");
+
+        List<Band> result = new ArrayList<>();
+        WayangPlan w = createWayangPlan(args[0], result);
+
+
+        // Source with L2A Sentinel Files
+        /*
+        get_band_infos receive DataFrame "l2a_path_df" containing "l2a_uuid", 
"l2a_path" and "l1c_uuid" columns
+
+        Defines get_band_info:
+            l2a_path/GRANULE/%unique_folder%/IMG_DATA/
+            you have resolutions R10m, R20m and R60m folders
+            Go through all of them reading the jp2 files inside
+            Splitting the name of the jp2 file by "_", obtain [0]:utm, 
[2]:band_name, [3]:resolution(apply regex "^\d+")
+                confirm that the resolution is consistent with the band name
+                    "B01": 60,
+                    "B02": 10,
+                    "B03": 10,
+                    "B04": 10,
+                    "B05": 20,
+                    "B06": 20,
+                    "B07": 20,
+                    "B08": 10,
+                    "B8A": 20,
+                    "B09": 60,
+                    "B10": 60,
+                    "B11": 20,
+                    "B12": 20,
+                    "TCI": 20,
+                    "AOT": 20,
+                    "WVP": 20,
+                    "SCL": 60,
+                confirm the band name is different from exclude_bands=["TCI", 
"AOT", "WVP"]
+            Create Dataframe.Row(band_name=band_name, band_path=band_path, 
resolution=resolution, utm=utm)
+            fill Array of Rows appending each Row to an array "band_infos"
+            In pyspark uses functions.py udf(get_band_info, schema) to create 
a UDF from get_band_info that returns
+            schema
+               schema = T.ArrayType(
+                T.StructType(
+                    [
+                        T.StructField("band_name", T.StringType(), False),
+                        T.StructField("band_path", T.StringType(), False),
+                        T.StructField("resolution", T.ShortType(), False),
+                        T.StructField("utm", T.StringType(), False),
+                    ]
+                )
+            )
+            udf will retrieve an Array of Dataframe.Rows with the StructType 
schema
+            if wide: we pass a l2a_path to the UDF, and we explode the result 
(functions.py explode)
+                F.explode(find_band_path_udf("l2a_path"))
+                    I believe after reading about the function that the only 
difference is that
+                    will create a Dataframe of schema, instead of an Array of 
Schema
+
+            Projects a new Dataframe with Select. Per Row in the input 
Dataframe l2a_path_df, assigns a Row for the new
+            Dataframe
+                "l1c_uuid",
+                "l2a_uuid",
+                "l2a_path",
+                F.explode(find_band_path_udf("l2a_path")).alias("bands")
+            As explained before, I suspect that the column bands it's a 
Dataframe of Schema, where each element represents
+            a band with its band name, path, resolution and UTM.
+            * Using mode "narrow" would only pass the array of Rows, not a 
Dataframe
+         */
+
+
+        /**
+         * Load_patches receives "band_info_df" Dataframe: ("l1c_uuid", 
"l2a_uuid", "l2a_path", 
F.explode(find_band_path_udf("l2a_path")).alias("bands"))
+         * over band_info_df.rdd executes a flatmap(gdal_load_and_split)
+         *
+         * gdal_load_and_split(band_info_row): // Describes the treatment of 
each row l1c_uuid
+                Uses osgeo.gdal
+         *
+         */
+
+
+        wayangContext.execute(w, 
ReflectionUtils.getDeclaringJar(MakePatches.class), 
ReflectionUtils.getDeclaringJar(JavaPlatform.class), 
ReflectionUtils.getDeclaringJar(SparkPlatform.class));
+
+        for (Band res : result) {
+            System.out.println(res);
+        }
+    }
+
+    public static WayangPlan createWayangPlan(
+            String inputFileUrl,
+            List<Band> result) {
+
+        System.out.println(inputFileUrl);
+
+        CollectionSource<String> source = new 
CollectionSource<>(Arrays.asList(inputFileUrl.split(",")), String.class);
+
+        /*Create dataframe*/
+        MapOperator<String, L2a_file> l2a_catalog = new MapOperator<>(
+                t -> {
+                    // UUID are just generated there, so, do the same
+                    UUID l1c_uuid = UUID.randomUUID();
+                    UUID l2a_uuid = UUID.randomUUID();
+                    return new L2a_file(l2a_uuid.toString(), t, 
l1c_uuid.toString());
+                },
+                String.class,
+                L2a_file.class
+        );
+
+        /* TODO: BigEarthNet Pipeline */
+        FlatMapOperator<L2a_file, Band> bands = new FlatMapOperator<>(
+                t -> {
+                    List<File> paths = Stream.of(Objects.requireNonNull(new 
File(t.getL2a_path() + "/GRANULE").listFiles()))
+                            .filter(File::isDirectory)
+                            .collect(Collectors.toList());
+
+                    if(paths.size() > 1) {
+                        throw new WayangException("Granule is expected to have 
a unique folder inside");
+                    }
+
+                    String img_folder = paths.get(0).getAbsolutePath() + 
"/IMG_DATA";
+
+                    List<Band> bands_resolution = 
processResolution(img_folder, "R10m", t);
+                    bands_resolution.addAll(processResolution(img_folder, 
"R20m", t));
+                    bands_resolution.addAll(processResolution(img_folder, 
"R60m", t));
+
+                    return bands_resolution;
+                },
+                L2a_file.class,
+                Band.class
+        );
+
+        MapOperator<Band, BandMetadata> metadata = new MapOperator<>(t-> new 
BandMetadata(t), Band.class,BandMetadata.class);
+
+
+        LocalCallbackSink<Band> sink = 
LocalCallbackSink.createCollectingSink(result, Band.class);
+
+
+//        String outputFileUrl = 
"file:///Users/rodrigopardomeza/tu-berlin/agoraeo/agoraeo/outputs/patches.log";
+//        TextFileSink<String> sink = new TextFileSink<>(outputFileUrl, 
String.class);
+
+        source.connectTo(0, l2a_catalog,0);
+        l2a_catalog.connectTo(0, bands,0);
+        bands.connectTo(0, sink,0);
+//        source.connectTo(0,toL2A,0);
+//        toL2A.connectTo(0,sink,0);
+
+
+        return new WayangPlan(sink);
+    }
+
+    private static List<Band> processResolution(String img_folder, String 
resolution, L2a_file l2a_file) {
+
+        // Missing to filter only the bands relevant to each resolution
+        return Stream.of(Objects.requireNonNull(new File(img_folder + "/" + 
resolution).listFiles()))
+                .filter(file -> file.isFile() &&
+                        
(Objects.equals(file.getName().substring(file.getName().lastIndexOf(".")+1), 
"jp2")))
+                .map(t -> {
+                    String[] parts = t.getName().split("_");
+                    return new Band(
+                            l2a_file.getL1c_uuid(),
+                            l2a_file.getL2a_uuid(),
+                            l2a_file.getL2a_path(),
+                            parts[2],
+                            img_folder + "/" + resolution + "/" + t.getName(),
+                            parts[3].substring(0,2),
+                            parts[0]
+                    );
+
+                })
+                .filter(t -> !Arrays.asList("TCI", "AOT", 
"WVP").contains(t.getBand_name()))
+                .filter(t -> consistentResolutionMapping(t.getResolution(), 
t.getBand_name()))
+                .collect(Collectors.toList());
+    }
+
+    private static boolean consistentResolutionMapping(String resolution, 
String band_name) {
+
+        return Objects.equals(resolutionMap.get(band_name), resolution);
+    }
+
+}
\ No newline at end of file
diff --git 
a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/iterators/IteratorSentinelDownload.java
 
b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/iterators/IteratorSentinelDownload.java
index 77111cd3..e80c45dc 100644
--- 
a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/iterators/IteratorSentinelDownload.java
+++ 
b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/iterators/IteratorSentinelDownload.java
@@ -28,12 +28,12 @@ public abstract class IteratorSentinelDownload<Input> 
implements Iterator<Input>
         this.command[0] = python_location;
         this.command[1] = module_name;
         int position = 2;
-        System.out.println(
-                String.format(
-                        "first command: %s",
-                        Arrays.toString(this.command)
-                )
-        );
+//        System.out.println(
+//                String.format(
+//                        "first command: %s",
+//                        Arrays.toString(this.command)
+//                )
+//        );
         for (Map.Entry<String, String> param : params.entrySet()) {
             this.command[position] = String.format(format, param.getKey());
             this.command[position + 1] = param.getValue();
diff --git 
a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/spark/Pepito.java
 
b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/spark/RunSen2Cor.java
similarity index 61%
rename from 
wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/spark/Pepito.java
rename to 
wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/spark/RunSen2Cor.java
index 2f21e850..1eda9b56 100644
--- 
a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/spark/Pepito.java
+++ 
b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/spark/RunSen2Cor.java
@@ -1,19 +1,19 @@
 package org.apache.wayang.agoraeo.operators.spark;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.spark.api.java.function.FlatMapFunction;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Serializable;
+import java.io.*;
 import java.util.Iterator;
+import java.util.stream.Stream;
+import java.nio.file.*;
 
-public class Pepito implements Serializable, FlatMapFunction<String, String> {
+public class RunSen2Cor implements Serializable, FlatMapFunction<String, 
String> {
 
     private final String sen2cor;
     private final String l2a_location;
 
-    public Pepito(String sen2cor, String l2a_location) {
+    public RunSen2Cor(String sen2cor, String l2a_location) {
 
         this.sen2cor = sen2cor;
         this.l2a_location = l2a_location;
@@ -26,11 +26,19 @@ public class Pepito implements Serializable, 
FlatMapFunction<String, String> {
                     s + " " +
                     " --output_dir " + l2a_location;
             Process process = Runtime.getRuntime().exec(command);
+
+            // TODO: ErrorStream should be redirected here
+
             Iterator<String> input = new BufferedReader(
                     new InputStreamReader(
                             process.getInputStream()
                     )
             ).lines().iterator();
+
+//            File f = new 
File("/Users/rodrigopardomeza/tu-berlin/agoraeo/agoraeo/sen2cor_logs" + 
s.substring(0, s.length()-5) + ".log");
+//            Files.copy(l, f, StandardCopyOption.REPLACE_EXISTING);
+//            FileUtils.copyInputStreamToFile(l, f);
+
             return input;
         } catch (IOException e) {
             throw new RuntimeException(e);
diff --git 
a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/spark/SparkSen2CorWrapper.java
 
b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/spark/SparkSen2CorWrapper.java
index 860536d0..734f2e68 100644
--- 
a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/spark/SparkSen2CorWrapper.java
+++ 
b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/spark/SparkSen2CorWrapper.java
@@ -18,31 +18,18 @@
 
 package org.apache.wayang.agoraeo.operators.spark;
 
-import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.wayang.agoraeo.operators.basic.Sen2CorWrapper;
-import org.apache.wayang.core.function.FunctionDescriptor;
 import org.apache.wayang.core.optimizer.OptimizationContext.OperatorContext;
 import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
 import org.apache.wayang.core.platform.ChannelDescriptor;
 import org.apache.wayang.core.platform.ChannelInstance;
 import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
 import org.apache.wayang.core.util.Tuple;
-import org.apache.wayang.java.channels.CollectionChannel;
-import org.apache.wayang.java.channels.JavaChannelInstance;
-import org.apache.wayang.java.channels.StreamChannel;
-import org.apache.wayang.java.execution.JavaExecutor;
-import org.apache.wayang.java.operators.JavaExecutionOperator;
 import org.apache.wayang.spark.channels.RddChannel;
 import org.apache.wayang.spark.execution.SparkExecutor;
 import org.apache.wayang.spark.operators.SparkExecutionOperator;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Serializable;
 import java.util.*;
-import java.util.function.Function;
-import java.util.stream.StreamSupport;
 
 //TODO add the documentation and add the Profile Estimator
 public class SparkSen2CorWrapper
@@ -72,10 +59,11 @@ public class SparkSen2CorWrapper
       assert inputs.length == this.getNumInputs();
       assert outputs.length == this.getNumOutputs();
 
-      Pepito pepe = new Pepito(sen2cor, l2a_location);
+      RunSen2Cor s2c = new RunSen2Cor(sen2cor, l2a_location);
+
 
       ((RddChannel.Instance) outputs[0]).accept(
-              ((RddChannel.Instance) 
inputs[0]).<String>provideRdd().flatMap(pepe), sparkExecutor
+              ((RddChannel.Instance) 
inputs[0]).<String>provideRdd().flatMap(s2c), sparkExecutor
       );
 
       return ExecutionOperator.modelLazyExecution(inputs, outputs, 
operatorContext);
diff --git 
a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/patches/Band.java
 
b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/patches/Band.java
new file mode 100644
index 00000000..e9456090
--- /dev/null
+++ 
b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/patches/Band.java
@@ -0,0 +1,85 @@
+package org.apache.wayang.agoraeo.patches;
+
+import java.io.Serializable;
+
+public class Band implements Serializable {
+
+    String l1c_uuid;
+    String l2a_uuid;
+    String l2a_path;
+    String band_name;
+    String band_path;
+    String resolution;
+    String utm;
+
+    public Band(String l1c_uuid, String l2a_uuid, String l2a_path, String 
band_name, String band_path, String resolution, String utm) {
+        this.l1c_uuid = l1c_uuid;
+        this.l2a_uuid = l2a_uuid;
+        this.l2a_path = l2a_path;
+        this.band_name = band_name;
+        this.band_path = band_path;
+        this.resolution = resolution;
+        this.utm = utm;
+    }
+
+    public String getL1c_uuid() {
+        return l1c_uuid;
+    }
+
+    public void setL1c_uuid(String l1c_uuid) {
+        this.l1c_uuid = l1c_uuid;
+    }
+
+    public String getL2a_uuid() {
+        return l2a_uuid;
+    }
+
+    public void setL2a_uuid(String l2a_uuid) {
+        this.l2a_uuid = l2a_uuid;
+    }
+
+    public String getL2a_path() {
+        return l2a_path;
+    }
+
+    public void setL2a_path(String l2a_path) {
+        this.l2a_path = l2a_path;
+    }
+
+    public String getBand_name() {
+        return band_name;
+    }
+
+    public void setBand_name(String band_name) {
+        this.band_name = band_name;
+    }
+
+    public String getBand_path() {
+        return band_path;
+    }
+
+    public void setBand_path(String band_path) {
+        this.band_path = band_path;
+    }
+
+    public String getResolution() {
+        return resolution;
+    }
+
+    public void setResolution(String resolution) {
+        this.resolution = resolution;
+    }
+
+    public String getUtm() {
+        return utm;
+    }
+
+    public void setUtm(String utm) {
+        this.utm = utm;
+    }
+
+    @Override
+    public String toString() {
+        return this.band_name + "|band_path:" + 
this.band_path.substring(this.band_path.lastIndexOf("/")+1) + "|" + 
this.resolution + "|" + this.utm + "|l2a_path:" + 
this.l2a_path.substring(this.l2a_path.lastIndexOf("/")+1);
+    }
+}
diff --git 
a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/patches/BandMetadata.java
 
b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/patches/BandMetadata.java
new file mode 100644
index 00000000..90d48547
--- /dev/null
+++ 
b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/patches/BandMetadata.java
@@ -0,0 +1,17 @@
+package org.apache.wayang.agoraeo.patches;
+
+//import org.
+
+public class BandMetadata {
+
+    public BandMetadata(Band b) {
+
+//         band_raster
+//                band_source=source,
+//                local_path=local_path,
+//                ul=(ulx, uly),
+//        lr=(lrx, lry),
+//        resolution=resolution,
+//                size=size,
+    }
+}
diff --git 
a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/patches/L2a_file.java
 
b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/patches/L2a_file.java
new file mode 100644
index 00000000..b772425e
--- /dev/null
+++ 
b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/patches/L2a_file.java
@@ -0,0 +1,45 @@
+package org.apache.wayang.agoraeo.patches;
+
+import java.io.Serializable;
+
+public class L2a_file implements Serializable {
+
+    String l2a_uuid;
+    String l2a_path;
+    String l1c_uuid;
+
+    public L2a_file(String l2a_uuid, String l2a_path, String l1c_uuid) {
+        this.l2a_uuid = l2a_uuid;
+        this.l2a_path = l2a_path;
+        this.l1c_uuid = l1c_uuid;
+    }
+
+    public String getL2a_uuid() {
+        return l2a_uuid;
+    }
+
+    public void setL2a_uuid(String l2a_uuid) {
+        this.l2a_uuid = l2a_uuid;
+    }
+
+    public String getL2a_path() {
+        return l2a_path;
+    }
+
+    public void setL2a_path(String l2a_path) {
+        this.l2a_path = l2a_path;
+    }
+
+    public String getL1c_uuid() {
+        return l1c_uuid;
+    }
+
+    public void setL1c_uuid(String l1c_uuid) {
+        this.l1c_uuid = l1c_uuid;
+    }
+
+    @Override
+    public String toString() {
+        return l2a_uuid + "|" + l2a_path + "|" + l1c_uuid;
+    }
+}

Reply via email to