This is an automated email from the ASF dual-hosted git repository. rpardomeza pushed a commit to branch WAYANG-agoraeo in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 951a5a0d084b3a17a71ae6ab1a3b536f1c96e075 Author: Rodrigo Pardo Meza <[email protected]> AuthorDate: Wed Feb 8 14:23:36 2023 +0100 Patches generation program from BandMetadata --- .../org/apache/wayang/agoraeo/MakePatches.java | 118 +++++++++++++++------ .../wayang/agoraeo/patches/BandMetadata.java | 7 +- 2 files changed, 87 insertions(+), 38 deletions(-) diff --git a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/MakePatches.java b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/MakePatches.java index b86b709c..7fafd2ec 100644 --- a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/MakePatches.java +++ b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/MakePatches.java @@ -20,12 +20,10 @@ package org.apache.wayang.agoraeo; import org.apache.wayang.agoraeo.patches.Band; import org.apache.wayang.agoraeo.patches.BandMetadata; -import org.apache.wayang.agoraeo.patches.L2a_file; +import org.apache.wayang.agoraeo.patches.L2aFile; import org.apache.wayang.agoraeo.patches.Patch; -import org.apache.wayang.basic.operators.CollectionSource; -import org.apache.wayang.basic.operators.FlatMapOperator; -import org.apache.wayang.basic.operators.LocalCallbackSink; -import org.apache.wayang.basic.operators.MapOperator; +import org.apache.wayang.basic.data.Tuple2; +import org.apache.wayang.basic.operators.*; import org.apache.wayang.core.api.Configuration; import org.apache.wayang.core.api.WayangContext; import org.apache.wayang.core.api.exception.WayangException; @@ -35,13 +33,11 @@ import org.apache.wayang.java.Java; import org.apache.wayang.java.platform.JavaPlatform; import org.apache.wayang.spark.Spark; import org.apache.wayang.spark.platform.SparkPlatform; +import scala.Tuple4; import java.io.File; -import java.nio.file.Path; -import java.nio.file.Paths; +import java.nio.ByteBuffer; import java.util.*; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -144,10 +140,9 @@ public class MakePatches { */ - /** + /* * Load_patches receives "band_info_df" Dataframe: ("l1c_uuid", "l2a_uuid", "l2a_path", F.explode(find_band_path_udf("l2a_path")).alias("bands")) * over band_info_df.rdd executes a flatmap(gdal_load_and_split) - * * gdal_load_and_split(band_info_row): // Describes the treatment of each row l1c_uuid Uses osgeo.gdal * @@ -156,9 +151,9 @@ public class MakePatches { wayangContext.execute(w, ReflectionUtils.getDeclaringJar(MakePatches.class), ReflectionUtils.getDeclaringJar(JavaPlatform.class), ReflectionUtils.getDeclaringJar(SparkPlatform.class)); - for (BandMetadata res : result) { - System.out.println(res); - } +// for (BandMetadata res : result) { +// System.out.println(res); +// } } public static WayangPlan createWayangPlan( @@ -170,19 +165,19 @@ public class MakePatches { CollectionSource<String> source = new CollectionSource<>(Arrays.asList(inputFileUrl.split(",")), String.class); /*Create dataframe*/ - MapOperator<String, L2a_file> l2a_catalog = new MapOperator<>( + MapOperator<String, L2aFile> l2a_catalog = new MapOperator<>( t -> { // UUID are just generated there, so, do the same UUID l1c_uuid = UUID.randomUUID(); UUID l2a_uuid = UUID.randomUUID(); - return new L2a_file(l2a_uuid.toString(), t, l1c_uuid.toString()); + return new L2aFile(l2a_uuid.toString(), t, l1c_uuid.toString()); }, String.class, - L2a_file.class + L2aFile.class ); /* TODO: BigEarthNet Pipeline */ - FlatMapOperator<L2a_file, Band> bands = new FlatMapOperator<>( + FlatMapOperator<L2aFile, Band> bands = new FlatMapOperator<>( t -> { List<File> paths = Stream.of(Objects.requireNonNull(new File(t.getL2a_path() + "/GRANULE").listFiles())) .filter(File::isDirectory) @@ -200,32 +195,31 @@ public class MakePatches { return bands_resolution; }, - L2a_file.class, + L2aFile.class, Band.class ); MapOperator<Band, BandMetadata> metadata = new MapOperator<>(BandMetadata::new, Band.class,BandMetadata.class); - FlatMapOperator<BandMetadata, Patch> patches_data = new FlatMapOperator<BandMetadata, Patch>( - t -> { - List<Patch> band_patches = createPatchesPerBand(t); - return band_patches; - }, + FlatMapOperator<BandMetadata, Patch> patches_data = new FlatMapOperator<>( + t -> createPatchesPerBand(t), BandMetadata.class, Patch.class ); - LocalCallbackSink<BandMetadata> sink = LocalCallbackSink.createCollectingSink(result, BandMetadata.class); - // his band.resolution = our pixel_resolution +// LocalCallbackSink<BandMetadata> sink = LocalCallbackSink.createCollectingSink(result, BandMetadata.class); -// String outputFileUrl = "file:///Users/rodrigopardomeza/tu-berlin/agoraeo/agoraeo/outputs/patches.log"; -// TextFileSink<String> sink = new TextFileSink<>(outputFileUrl, String.class); + String outputFileUrl = "file:///Users/rodrigopardomeza/tu-berlin/agoraeo/agoraeo/outputs/patches.log"; + TextFileSink<Patch> sink = new TextFileSink<>(outputFileUrl, Patch.class); + + // his band.resolution = our pixel_resolution source.connectTo(0, l2a_catalog,0); l2a_catalog.connectTo(0, bands,0); bands.connectTo(0, metadata,0); - metadata.connectTo(0, sink,0); + metadata.connectTo(0, patches_data,0); + patches_data.connectTo(0, sink,0); // source.connectTo(0,toL2A,0); // toL2A.connectTo(0,sink,0); @@ -235,10 +229,66 @@ public class MakePatches { private static List<Patch> createPatchesPerBand(BandMetadata t) { /* TODO: Pipeline to get the data for patches from each Band */ - return new ArrayList<>(); + Tuple2<Integer, Integer> steps = t.getSteps(); + List<Patch> patches = new ArrayList<>(); + + for(int x = 0; x < steps.field0; x++) { + for(int y = 0; y < steps.field1; y++) { +// System.out.println("Creating Patch: " + t.getUtm() + "|" + t.getBand_name() + "|" + x + "," + y); + + Tuple4<Double, Double, Double, Double> patchCoords = getPatchCoords(x, y, t.getUl(), t.getPatch_size(), t.getPixel_resolution()); + + /* Patch size is the number of pixels of the patch + xoff if the number of pixels to skip before reading in x dimension */ + int xoff = x * t.getPatch_size().field0; + int yoff = y * t.getPatch_size().field1; + + /* While the patch_size is the number of pixels to read */ + // TODO: Impossible to create Numeric Type Numpy Array in Java. Can we generate it in Python as output? + ByteBuffer buffy = t.getBand_raster().ReadRaster_Direct(xoff, yoff, t.getPatch_size().field0, t.getPatch_size().field1); + byte[] data = new byte[buffy.remaining()]; + buffy.get(data); + + Patch patch = new Patch( + t.getUtm(), + t.getBand_resolution(), + t.getBand_name(), + patchCoords._1(), + patchCoords._2(), + patchCoords._3(), + patchCoords._4(), + new Tuple2<>(x, y), + t.getEspg(), + t.getProjection(), + data, + t.getPixel_resolution() + ); + + patches.add(patch); + } + } + + return patches; + } + + /* Returns Specific coordinates of the Patch in this specific Band regarding its resolution + (Not necessarily the coordinates in the Raster(s), Model only provide geographic space integrity) */ + private static Tuple4<Double, Double, Double, Double> getPatchCoords(int x, int y, Tuple2<Double, Double> ul, Tuple2<Integer, Integer> patch_size, Tuple2<Double, Double> pixel_resolution) { + + /* Patch size (in pixels) is defined regarding the pixel resolution, coord_x_step multiplies it by + pixel_resolution (long in mm) translating it into coordinates*/ + double coord_x_step = patch_size.field0 * pixel_resolution.field0; + double coord_y_step = patch_size.field1 * pixel_resolution.field1; + + /* Calculate the limits of the corners for current patch */ + Double ulx = ul.field0 + x * coord_x_step; + Double uly = ul.field1 + y * coord_y_step; + Double lrx = ul.field0 + (x + 1) * coord_x_step; + Double lry = ul.field1 + (y + 1) * coord_y_step; + return new Tuple4<>(ulx, uly, lrx, lry); } - private static List<Band> processResolution(String img_folder, String resolution, L2a_file l2a_file) { + private static List<Band> processResolution(String img_folder, String resolution, L2aFile l2A_file) { // Missing to filter only the bands relevant to each resolution return Stream.of(Objects.requireNonNull(new File(img_folder + "/" + resolution).listFiles())) @@ -247,9 +297,9 @@ public class MakePatches { .map(t -> { String[] parts = t.getName().split("_"); return new Band( - l2a_file.getL1c_uuid(), - l2a_file.getL2a_uuid(), - l2a_file.getL2a_path(), + l2A_file.getL1c_uuid(), + l2A_file.getL2a_uuid(), + l2A_file.getL2a_path(), parts[2], img_folder + "/" + resolution + "/" + t.getName(), parts[3].substring(0,2), diff --git a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/patches/BandMetadata.java b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/patches/BandMetadata.java index 55223e0b..419b1745 100644 --- a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/patches/BandMetadata.java +++ b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/patches/BandMetadata.java @@ -167,17 +167,16 @@ public class BandMetadata implements Serializable { ; } - public String getSteps(){ + public Tuple2<Integer, Integer> getSteps(){ /* TODO: Big difference, we are delivering different sizes per resolution of the same UTM or l1c image * V delivered only regarding to one * * For UTM products until now generates always 91x91 patches * */ - Tuple2<Integer, Integer> steps = new Tuple2<>( + return new Tuple2<>( size.field0 / this.patch_size.field0, size.field1 / this.patch_size.field1 - ) ; - return steps.toString(); + ); } }
