This is an automated email from the ASF dual-hosted git repository. desruisseaux pushed a commit to branch geoapi-4.0 in repository https://gitbox.apache.org/repos/asf/sis.git
commit e11eee17700132903d380cc71bd58aa5598a877e Author: Martin Desruisseaux <[email protected]> AuthorDate: Fri Jan 22 22:55:02 2021 +0100 Implement parallel (multi-threaded) computation of isolines. --- .../sis/internal/feature/j2d/PathBuilder.java | 22 +- .../internal/processing/image/CompoundFuture.java | 202 +++++++++++ .../internal/processing/image/IsolineTracer.java | 102 +++++- .../sis/internal/processing/image/Isolines.java | 191 ++++++++-- .../internal/processing/image/TiledProcess.java | 394 +++++++++++++++++++++ 5 files changed, 869 insertions(+), 42 deletions(-) diff --git a/core/sis-feature/src/main/java/org/apache/sis/internal/feature/j2d/PathBuilder.java b/core/sis-feature/src/main/java/org/apache/sis/internal/feature/j2d/PathBuilder.java index faf54a8..a971f92 100644 --- a/core/sis-feature/src/main/java/org/apache/sis/internal/feature/j2d/PathBuilder.java +++ b/core/sis-feature/src/main/java/org/apache/sis/internal/feature/j2d/PathBuilder.java @@ -65,11 +65,25 @@ public class PathBuilder { * Verifies that {@link #size} is even, positive and smaller than the given limit. * This method is used for assertions. */ - private boolean validSize(final int limit) { + private boolean isValidSize(final int limit) { return size >= 0 && size <= limit && (size & 1) == 0; } /** + * Adds all polylines defined in the other builder. The other builder shall have no polylines under + * construction, i.e. {@link #append(double[], int, boolean)} shall not have been invoked since last + * {@link #createPolyline(boolean)} invocation. + * + * @param other the other builder for which to add polylines, or {@code null} if none. + */ + public final void append(final PathBuilder other) { + if (other != null) { + assert other.size == 0; + polylines.addAll(other.polylines); + } + } + + /** * Appends the given coordinates to current polyline, omitting repetitive points. * Coordinates are added to the same polyline than the one updated by previous calls * to this method, unless {@link #createPolyline(boolean)} has been invoked before. @@ -115,7 +129,7 @@ public class PathBuilder { if (Double.isNaN(x) || Double.isNaN(y)) { if (offset != 0) { size = filterChunk(coordinates, size, offset); - assert validSize(offset) : size; + assert isValidSize(offset) : size; createPolyline(false); offset = 0; } @@ -128,7 +142,7 @@ public class PathBuilder { } } size = filterChunk(coordinates, size, offset); - assert validSize(offset) : size; + assert isValidSize(offset) : size; } /** @@ -184,7 +198,7 @@ public class PathBuilder { */ public final void createPolyline(boolean close) throws TransformException { size = filterFull(coordinates, size); - assert validSize(coordinates.length) : size; + assert isValidSize(coordinates.length) : size; /* * If the point would be alone, discard the lonely point because it would be invisible * (a "move to" operation without "line to"). If there is two points, they should not diff --git a/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/CompoundFuture.java b/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/CompoundFuture.java new file mode 100644 index 0000000..c757108 --- /dev/null +++ b/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/CompoundFuture.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sis.internal.processing.image; + +import java.util.Set; +import java.util.HashSet; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.CancellationException; +import org.apache.sis.internal.util.CollectionsExt; +import org.apache.sis.internal.feature.Resources; + + +/** + * The result of multiple asynchronous computations. + * This {@code Future} is considered completed when all components are completed. + * + * @author Martin Desruisseaux (Geomatys) + * @version 1.1 + * @since 1.1 + * @module + */ +final class CompoundFuture<R> implements Future<R> { + /** + * The elements making this computation. + */ + private final Future<R>[] components; + + /** + * Creates a new future with the given components. + */ + private CompoundFuture(final Future<R>[] components) { + this.components = components; + } + + /** + * Returns a future waiting for all given tasks to complete. + * If the array length is 1, then this method returns directly its singleton element. + * + * @param <R> type if result computed by tasks. + * @param components the sub-tasks to execute. This array is not cloned; do not modify. + * @return a future containing all given tasks. + */ + public static <R> Future<R> create(final Future<R>[] components) { + switch (components.length) { + case 0: return null; + case 1: return components[0]; + } + return new CompoundFuture<>(components); + } + + /** + * Attempts to cancel execution of this task. After this method return, subsequent calls + * to {@link #isCancelled()} return {@code true} if this method returned {@code true}. + * + * <h4>Departure from specification</h4> + * {@code Future} specification requires that after this method returns, subsequent calls + * to {@link #isDone()} return {@code true}. This is not guaranteed in this implementation. + * + * @param mayInterruptIfRunning whether the thread executing tasks should be interrupted. + * @return {@code true} if at least one component task could be interrupted. + */ + @Override + public boolean cancel(final boolean mayInterruptIfRunning) { + boolean canceled = false; + for (final Future<R> c : components) { + canceled |= c.cancel(mayInterruptIfRunning); + } + return canceled; + } + + /** + * Returns {@code true} if this task was cancelled before it completed normally. + * This task is considered cancelled if at least one component has been cancelled. + * + * @return {@code true} if at least one component task was cancelled before it completed. + */ + @Override + public boolean isCancelled() { + for (final Future<R> c : components) { + if (c.isCancelled()) return true; + } + return false; + } + + /** + * Returns {@code true} if this task completed. + * Completion may be due to normal termination, an exception, or cancellation. + * + * @return {@code true} if all component tasks completed. + */ + @Override + public boolean isDone() { + for (final Future<R> c : components) { + if (!c.isDone()) return false; + } + return true; + } + + /** + * Waits if necessary for all computations to complete, and then retrieves the result. + * If all task components return either {@code null} or the same {@code <R>} value, + * then that result is returned. Otherwise the various {@code <R>} values are given + * to {@link #merge(Collection)} for obtaining a single result. + * + * @return the computed result. + * @throws CancellationException if at least one computation was cancelled. + * @throws ExecutionException if at least one computation threw an exception. + * @throws InterruptedException if the current thread was interrupted while waiting. + */ + @Override + public R get() throws InterruptedException, ExecutionException { + try { + return get(0, true); + } catch (TimeoutException e) { + // Should never happen because we specified `noTimeOut = true` + throw new AssertionError(e); + } + } + + /** + * Same as {@link #get()} but with a timeout. The given timeout is the total timeout; + * each component task may have a smaller timeout for keeping the total equal to the + * given value. + * + * @param timeout the maximum time to wait. + * @param unit the time unit of the timeout argument. + * @throws CancellationException if at least one computation was cancelled. + * @throws ExecutionException if at least one computation threw an exception. + * @throws InterruptedException if the current thread was interrupted while waiting. + * @throws TimeoutException if the wait timed out. + */ + @Override + public R get(final long timeout, final TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException + { + return get(System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, unit), false); + } + + /** + * Implementation of public {@code get(…)} methods. + * The timeout given to this method, if not ignored, is an absolute timeout. + * + * @param timeout {@link System#nanoTime()} value when to stop waiting. + * @param noTimeOut {@code true} if {@code timeout} should be ignored. + */ + private R get(final long timeout, final boolean noTimeOut) + throws InterruptedException, ExecutionException, TimeoutException + { + R singleton = null; + Set<R> results = null; + for (final Future<R> c : components) { + final R r = noTimeOut ? c.get() : c.get(Math.max(0, timeout - System.nanoTime()), TimeUnit.NANOSECONDS); + if (r != null) { + if (singleton == null) { + singleton = r; + } else if (r != singleton) { + if (results == null) { + results = new HashSet<>(); + results.add(singleton); + } + results.add(r); + } + } + } + if (results != null) { + singleton = merge(results); + } + return singleton; + } + + /** + * Invoked by {@code get(…)} if there is more than one non-null instance. + * The default implementation throws an exception. + * + * @param results all non-null instances found. + * @return the unique instance to return. + */ + protected R merge(final Set<R> results) { + final R singleton = CollectionsExt.singletonOrNull(results); + if (singleton != null) { + return singleton; + } + throw new IllegalStateException(Resources.format(Resources.Keys.NotASingleton_1, "get()")); + } +} diff --git a/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/IsolineTracer.java b/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/IsolineTracer.java index 5545965..246d8a5 100644 --- a/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/IsolineTracer.java +++ b/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/IsolineTracer.java @@ -17,11 +17,13 @@ package org.apache.sis.internal.processing.image; import java.util.Arrays; +import java.util.ArrayList; import java.util.Map; import java.util.HashMap; -import java.util.ArrayList; +import java.util.IdentityHashMap; import java.util.Collections; import java.awt.Point; +import java.awt.Rectangle; import java.awt.Shape; import org.opengis.referencing.operation.MathTransform; import org.opengis.referencing.operation.TransformException; @@ -77,7 +79,15 @@ final class IsolineTracer { int y; /** + * Translation to apply on coordinates. For isolines computed sequentially, this is the image origin + * (often 0,0 but not necessarily). For isolines computed in parallel, the translations are different + * for each computation tile. + */ + private final double translateX, translateY; + + /** * Final transform to apply on coordinates (integer source coordinates at pixel centers). + * Can be {@code null} if none. */ private final MathTransform gridToCRS; @@ -86,11 +96,14 @@ final class IsolineTracer { * * @param window the 2×2 window containing pixel values in the 4 corners of current contouring grid cell. * @param pixelStride increment to the position in {@code window} for reading next sample value. + * @param domain pixel coordinates where iteration will happen. * @param gridToCRS final transform to apply on coordinates (integer source coordinates at pixel centers). */ - IsolineTracer(final double[] window, final int pixelStride, final MathTransform gridToCRS) { + IsolineTracer(final double[] window, final int pixelStride, final Rectangle domain, final MathTransform gridToCRS) { this.window = window; this.pixelStride = pixelStride; + this.translateX = domain.x; + this.translateY = domain.y; this.gridToCRS = gridToCRS; } @@ -428,7 +441,8 @@ final class IsolineTracer { */ private void interpolateMissingLeftSide() { if (polylineOnLeft.size == 0) { - polylineOnLeft.append(x, y + interpolate(0, 2*pixelStride)); + polylineOnLeft.append(translateX + (x), + translateY + (y + interpolate(0, 2*pixelStride))); } } @@ -446,7 +460,8 @@ final class IsolineTracer { * Appends to the given polyline a point interpolated on the top side. */ private void interpolateOnTopSide(final Polyline appendTo) { - appendTo.append(x + interpolate(0, pixelStride), y); + appendTo.append(translateX + (x + interpolate(0, pixelStride)), + translateY + (y)); } /** @@ -454,7 +469,8 @@ final class IsolineTracer { * The polyline on right side will become {@code polylineOnLeft} in next column. */ private void interpolateOnRightSide() { - polylineOnLeft.append(x + 1, y + interpolate(pixelStride, 3*pixelStride)); + polylineOnLeft.append(translateX + (x + 1), + translateY + (y + interpolate(pixelStride, 3*pixelStride))); } /** @@ -462,7 +478,8 @@ final class IsolineTracer { * The polyline on top side will become a {@code polylineOnBottoù} in next row. */ private void interpolateOnBottomSide(final Polyline polylineOnTop) { - polylineOnTop.append(x + interpolate(2*pixelStride, 3*pixelStride), y + 1); + polylineOnTop.append(translateX + (x + interpolate(2*pixelStride, 3*pixelStride)), + translateY + (y + 1)); } /** @@ -546,14 +563,19 @@ final class IsolineTracer { private void writeUnclosed(final Polyline polyline) throws TransformException { final Unclosed fragment = new Unclosed(polyline, null); final Polyline[] polylines; + final boolean close; if (fragment.isEmpty()) { + close = false; polylines = new Polyline[] {polyline.opposite, polyline}; // (reverse, forward) point order. - } else if (fragment.addOrMerge(partialPaths)) { - polylines = fragment.toPolylines(); } else { - return; + close = fragment.addOrMerge(partialPaths); + if (!close) { + // Keep in `partialPaths`. Maybe it can be closed later. + return; + } + polylines = fragment.toPolylines(); } - path = writeTo(path, polylines, false); + path = writeTo(path, polylines, close); } /** @@ -570,7 +592,7 @@ final class IsolineTracer { /** * Invoked after the iteration has been completed on the full area of interest. - * This method writes all remaining polylines to {@link #path} or {@link #partialPaths}. + * This method writes all remaining polylines to {@link #partialPaths}. * It assumes that {@link #finishedRow()} has already been invoked. * This {@code Isoline} can not be used anymore after this call. */ @@ -586,6 +608,58 @@ final class IsolineTracer { writeUnclosed(polylinesOnTop[i]); polylinesOnTop[i] = null; } + assert isConsistent(); + } + + /** + * Verifies that {@link #partialPaths} consistency. Used for assertions only. + */ + private boolean isConsistent() { + for (final Map.Entry<Point,Unclosed> entry : partialPaths.entrySet()) { + if (!entry.getValue().isExtremity(entry.getKey())) return false; + } + return true; + } + + /** + * Transfers all {@code other} polylines into this instance. The {@code other} instance should be a neighbor, + * i.e. an instance sharing a border with this instance. The {@code other} instance will become empty after + * this method call. + * + * @param other a neighbor level (on top, left, right or bottom) to merge with this level. + * @throws TransformException if an error occurred during polylines creation. + */ + final void merge(final Level other) throws TransformException { + assert other != this && other.value == value; + if (path == null) { + path = other.path; + } else { + path.append(other.path); + } + other.path = null; + assert this.isConsistent(); + assert other.isConsistent(); + final IdentityHashMap<Unclosed,Boolean> done = new IdentityHashMap<>(other.partialPaths.size() / 2); + for (final Map.Entry<Point,Unclosed> entry : other.partialPaths.entrySet()) { + final Unclosed fragment = entry.getValue(); + if (done.put(fragment, Boolean.TRUE) == null) { + assert fragment.isExtremity(entry.getKey()); + if (fragment.addOrMerge(partialPaths)) { + path = writeTo(path, fragment.toPolylines(), true); + fragment.clear(); + } + } + } + } + + /** + * Flushes any pending {@link #partialPaths} to {@link #path}. This method is invoked after + * {@link #finish()} has been invoked for all sub-regions (many sub-regions may exist if + * isoline generation has been splitted for parallel computation). + * + * @throws TransformException if an error occurred during polylines creation. + */ + final void flush() throws TransformException { for (final Map.Entry<Point,Unclosed> entry : partialPaths.entrySet()) { final Unclosed fragment = entry.getValue(); assert fragment.isExtremity(entry.getKey()); @@ -1051,7 +1125,7 @@ final class IsolineTracer { */ private static final class Joiner extends PathBuilder { /** - * Final transform to apply on coordinates. + * Final transform to apply on coordinates, or {@code null} if none. */ private final MathTransform gridToCRS; @@ -1142,7 +1216,9 @@ final class IsolineTracer { */ @Override protected int filterFull(final double[] coordinates, final int upper) throws TransformException { - gridToCRS.transform(coordinates, 0, coordinates, 0, upper / Polyline.DIMENSION); + if (gridToCRS != null) { + gridToCRS.transform(coordinates, 0, coordinates, 0, upper / Polyline.DIMENSION); + } return upper; } } diff --git a/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/Isolines.java b/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/Isolines.java index 3be55e0..4f0a060 100644 --- a/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/Isolines.java +++ b/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/Isolines.java @@ -19,13 +19,13 @@ package org.apache.sis.internal.processing.image; import java.util.Arrays; import java.util.TreeMap; import java.util.NavigableMap; +import java.util.concurrent.Future; import java.awt.Shape; import java.awt.geom.Path2D; import java.awt.image.RenderedImage; import org.opengis.coverage.grid.SequenceType; import org.opengis.referencing.operation.MathTransform; import org.opengis.referencing.operation.TransformException; -import org.apache.sis.referencing.operation.transform.MathTransforms; import org.apache.sis.image.PixelIterator; import org.apache.sis.util.ArgumentChecks; import org.apache.sis.util.ArraysExt; @@ -55,27 +55,40 @@ public final class Isolines { private final IsolineTracer.Level[] levels; /** - * Creates an initially empty set of isolines for the given levels. - * The given array should be a clone of user-provided array because - * this constructor may modify it in-place. + * Creates an initially empty set of isolines for the given levels. The given {@code values} + * array should be one of the arrays validated by {@link #cloneAndSort(double[][])}. */ private Isolines(final IsolineTracer tracer, final int band, final double[] values, final int width) { - Arrays.sort(values); - int n = values.length; - while (n > 0 && Double.isNaN(values[n-1])) n--; - for (int i=n; --i>0;) { - if (values[i] == values[i-1]) { - // Remove duplicated elements. May replace -0 by +0. - System.arraycopy(values, i, values, i-1, n-- - i); - } - } - levels = new IsolineTracer.Level[n]; - for (int i=0; i<n; i++) { + levels = new IsolineTracer.Level[values.length]; + for (int i=0; i<values.length; i++) { levels[i] = tracer.new Level(band, values[i], width); } } /** + * Ensures that the given arrays are sorted and contains no NaN value. + */ + private static double[][] cloneAndSort(double[][] levels) { + levels.clone(); + for (int j=0; j<levels.length; j++) { + double[] values = levels[j]; + ArgumentChecks.ensureNonNullElement("levels", j, values); + values = values.clone(); + Arrays.sort(values); + int n = values.length; + while (n > 0 && Double.isNaN(values[n-1])) n--; + for (int i=n; --i>0;) { + if (values[i] == values[i-1]) { + // Remove duplicated elements. May replace -0 by +0. + System.arraycopy(values, i, values, i-1, n-- - i); + } + } + levels[j] = ArraysExt.resize(values, n); + } + return levels; + } + + /** * Sets the specified bit on {@link IsolineTracer.Level#isDataAbove} for all levels lower than given value. * * <h4>How strict equalities are handled</h4> @@ -106,6 +119,7 @@ public final class Isolines { /** * Generates isolines at the specified levels computed from data provided by the given image. * Isolines will be created for every bands in the given image. + * This method performs all computation sequentially in current thread. * * @param data image providing source values. * @param levels values for which to compute isolines. An array should be provided for each band. @@ -117,20 +131,147 @@ public final class Isolines { * @throws TransformException if an interpolated point can not be transformed using the given transform. */ public static Isolines[] generate(final RenderedImage data, final double[][] levels, - MathTransform gridToCRS) throws TransformException + final MathTransform gridToCRS) throws TransformException { ArgumentChecks.ensureNonNull("data", data); ArgumentChecks.ensureNonNull("levels", levels); - { // For keeping variable locale. - final MathTransform gridToImage = MathTransforms.translation(data.getMinX(), data.getMinY()); - if (gridToCRS == null) { - gridToCRS = gridToImage; - } else { - ArgumentChecks.ensureDimensionsMatch("gridToCRS", 2, 2, gridToCRS); - gridToCRS = MathTransforms.concatenate(gridToImage, gridToCRS); + return flush(generate(iterators().create(data), cloneAndSort(levels), gridToCRS)); + } + + /** + * Generates isolines in background using an arbitrary amount of processors. + * This method returns immediately (i.e. the current thread is not used for isoline computation). + * The result will become available at a later time in the {@link Future} object. + * + * @param data image providing source values. + * @param levels values for which to compute isolines. An array should be provided for each band. + * If there is more bands than {@code levels.length}, the last array is reused for + * all remaining bands. + * @param gridToCRS transform from pixel coordinates to geometry coordinates, or {@code null} if none. + * Integer source coordinates are located at pixel centers. + * @return a {@code Future} representing pending completion of isoline computation. + */ + public static Future<Isolines[]> parallelGenerate(final RenderedImage data, final double[][] levels, + final MathTransform gridToCRS) + { + ArgumentChecks.ensureNonNull("data", data); + ArgumentChecks.ensureNonNull("levels", levels); + return new Process(data, cloneAndSort(levels), gridToCRS).execute(); + } + + /** + * Returns a provider of {@link PixelIterator} suitable to isoline computations. + * It is critical that iterators use {@link SequenceType#LINEAR} iterator order. + */ + private static PixelIterator.Builder iterators() { + return new PixelIterator.Builder().setIteratorOrder(SequenceType.LINEAR); + } + + /** + * Creates all polylines that were still pending. This method should be the very last step, + * when all other computations finished. Pending polylines are sequences of points not yet + * stored in geometry objects because they were waiting to see if additional points would + * close them as polygons. This {@code flush()} method is invoked when we know that it will + * not be the case because there is no more points to add. + * + * @param isolines the isolines for which to write pending polylines. + * @return the {@code isolines} array, returned for convenience. + * @throws TransformException if an error occurred during polylines creation. + */ + private static Isolines[] flush(final Isolines[] isolines) throws TransformException { + for (final Isolines isoline : isolines) { + for (final IsolineTracer.Level level : isoline.levels) { + level.flush(); } } - final PixelIterator iterator = new PixelIterator.Builder().setIteratorOrder(SequenceType.LINEAR).create(data); + return isolines; + } + + /** + * Wraps {@code Isolines.generate(…)} calculation in a process for parallel execution. + * The source image is divided in sub-region and the isolines in each sub-region will + * be computed in a different thread. + */ + private static final class Process extends TiledProcess<Isolines[]> { + /** + * Values for which to compute isolines. An array should be provided for each band. + * If there is more bands than {@code levels.length}, the last array is reused for + * all remaining bands. + * + * @see #cloneAndSort(double[][]) + */ + private final double[][] levels; + + /** + * Transform from image upper left corner (in pixel coordinates) to geometry coordinates. + */ + private final MathTransform gridToCRS; + + /** + * Creates a process for parallel isoline computation. + * + * @param data image providing source values. + * @param levels values for which to compute isolines. + * @param gridToCRS transform from pixel coordinates to geometry coordinates, or {@code null} if none. + */ + Process(final RenderedImage data, final double[][] levels, final MathTransform gridToCRS) { + super(data, 1, 1, iterators()); + this.levels = levels; + this.gridToCRS = gridToCRS; + } + + /** + * Invoked by {@link TiledProcess} for creating a sub-task + * doing isoline computation on a sub-region of the image. + */ + @Override + protected Task createSubTask() { + return new Tile(); + } + + /** + * A sub-task doing isoline computation on a sub-region of the image. + * The region is determined by the {@link #iterator}. + */ + private final class Tile extends Task { + /** Isolines computed in the sub-region of this sub-task. */ + private Isolines[] isolines; + + /** Creates a new sub-task. */ + Tile() { + } + + /** Invoked in a background thread for performing isoline computation. */ + @Override protected void execute() throws TransformException { + isolines = generate(iterator, levels, gridToCRS); + } + + /** Invoked in a background thread for merging results of two sub-tasks. */ + @Override protected void merge(final Task neighbor) throws TransformException { + for (int i=0; i<isolines.length; i++) { + final Isolines target = isolines[i]; + final Isolines source = ((Tile) neighbor).isolines[i]; + for (int j=0; j<target.levels.length; j++) { + target.levels[j].merge(source.levels[j]); + } + } + } + + /** Invoked on the last sub-task (after all merges) for getting final result. */ + @Override protected Isolines[] result() throws TransformException { + return flush(isolines); + } + } + } + + /** + * Generates isolines for the specified levels in the region traversed by the given iterator. + * This is the implementation of {@link #generate(RenderedImage, double[][], MathTransform)} + * but potentially on a sub-region for parallel "fork-join" execution. + */ + private static Isolines[] generate(final PixelIterator iterator, final double[][] levels, + final MathTransform gridToCRS) throws TransformException + { /* * Prepares a window of size 2×2 pixels over pixel values. Window elements are traversed * by incrementing indices in following order: band, column, row. The window content will @@ -138,7 +279,7 @@ public final class Isolines { */ final int numBands = iterator.getNumBands(); final double[] window = new double[numBands * 4]; - final IsolineTracer tracer = new IsolineTracer(window, numBands, gridToCRS); + final IsolineTracer tracer = new IsolineTracer(window, numBands, iterator.getDomain(), gridToCRS); /* * Prepare the set of isolines for each band in the image. * The number of cells on the horizontal axis is one less diff --git a/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/TiledProcess.java b/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/TiledProcess.java new file mode 100644 index 0000000..350cdd8 --- /dev/null +++ b/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/TiledProcess.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sis.internal.processing.image; + +import java.awt.Rectangle; +import java.awt.image.RenderedImage; +import java.util.concurrent.Future; +import java.util.concurrent.Callable; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.sis.internal.system.CommonExecutor; +import org.apache.sis.internal.util.Numerics; +import org.apache.sis.internal.jdk9.JDK9; +import org.apache.sis.image.PixelIterator; +import org.apache.sis.util.ArgumentChecks; + + +/** + * Calculation in a two-dimensional space that can be subdivided in smaller calculations in sub-regions. + * This class manages a kind of fork-join task but differs from {@link java.util.concurrent.ForkJoinPool} + * in the following aspects: + * + * <ul class="verbose"> + * <li>The fork and join processes are presumed relatively costly (i.e. they may need to reconstruct + * geometries splitted in two consecutive tiles). So instead of having many medium tasks waiting + * for a thread to take them, it may be more efficient to have fewer tasks processing larger areas. + * {@code TiledProcess} tries to create a number of sub-tasks close to the number of processors. + * This is a different approach than "work stealing" algorithm applied by JDK {@code ForkJoinPool}, + * which is designed for smaller (and more easily separable) non-blocking tasks.</li> + * <li>The main task is splitted in sub-tasks with a single fork step, with two division factors along + * <var>x</var> and <var>y</var> axes which can be any integer (not necessarily powers of 2). This + * is a different approach than JDK {@code ForkJoinPool} where tasks are forked recursively in two + * sub-tasks at each step.</li> + * <li>The join operation tries to operate on tiles that are neighbors in the two dimensional space. + * It allows the join operation to merge geometries that are splited between two tiles.</li> + * <li>Tasks may block on I/O operations. We want to avoid blocking the JDK common fork/join pool, + * so we use a separated pool.</li> + * </ul> + * + * The tiling applied by {@code TiledProcess} is independent of {@link RenderedImage} tiling. + * This class assumes that the objects to be calculated are geometries or other non-raster data. + * Consequently tile size will be determined by other considerations such as the number of processors. + * + * @author Martin Desruisseaux (Geomatys) + * @version 1.1 + * + * @param <R> the type of value computed as a result of this process. + * + * @since 1.1 + * @module + */ +abstract class TiledProcess<R> { + /** + * Minimal "tile" size for sub-tasks computation. That size should not be too small because the + * fork/join processes have some extra cost compared to processing the whole image as one single tile. + */ + private static final int MIN_TILE_SIZE = 400; + + /** + * Number of threads that are running. This is used for knowing when a thread is the last one. + */ + private final AtomicInteger runningThreads; + + /** + * All tasks executed in parallel threads. The array length should be less than the number of processors. + * All read and write accesses to array elements must be done inside a {@code synchronized(tasks)} block. + * + * <p>This array initially contains only {@code null} elements. Non-null elements are assigned after + * {@link Task#execute()} completion but before {@link Task#merge(Task)}.</p> + */ + private final Task[] tasks; + + /** + * Number of tiles (or tasks) on the <var>x</var> axis. Used for computing (x,y) coordinates of elements + * in the {@link #tasks} array, considering that the array is a matrix encoded in row-major fashion. + * This is the increment to apply on array index for incrementing the <var>y</var> coordinate by one. + */ + private final int yStride; + + /** + * Index of this {@code TiledProcess} instance in the {@link #tasks} array. + * This is a temporary variable for {@link Task#index} initialization only. + */ + private int taskIndex; + + /** + * Iterator over the pixel for each element in the {@link #tasks} array. + * This is a temporary variable for {@link Task#iterator} initialization only. + */ + private PixelIterator[] iterators; + + /** + * Prepares {@link TiledProcess} for execution of a task splitted in different regions. + * This constructor splits the given image in sub-regions ("tiles" but not in the sense + * of {@link RenderedImage} tiles), then creates a pixel iterator for each sub-region. + * Iterators are created with the given {@link org.apache.sis.image.PixelIterator.Builder}, + * which should be configured by the caller in all desired aspects (e.g. iteration order) except the + * {@linkplain org.apache.sis.image.PixelIterator.Builder#setRegionOfInterest(Rectangle) region of interest}, + * which will be overwritten by this method. + * + * <p>Usage example:</p> + * {@preformat java + * TiledProcess process = new TiledProcess<MyResultType>(image, 0, 0, + * new PixelIterator.Builder().setIteratorOrder(SequenceType.LINEAR)) + * { + * @Override + * protected Task createSubTask() { + * return new SubTask(); + * } + * + * private final class SubTask extends Task { + * private MyResultType result; + * + * @Override protected void execute() {result = ...} // Do calculation in background thread. + * @Override protected void merge(Task neighbor) {...} // Merge this.result with neighbor.result(). + * @Override protected MyResultType result() {return result;} + * } + * }; + * process.execute(); + * } + * + * @param data the image on which to iterate over the pixels. + * @param overlapX the number of overlapping pixels between tiles on the <var>x</var> axis. + * @param overlapY the number of overlapping pixels between tiles on the <var>y</var> axis. + * @param iteratorFactory a pre-configured (except for sub-region aspect) supplier of pixel iterators. + * @throws ArithmeticException if the image size is too large. + */ + @SuppressWarnings({"rawtypes", "unchecked"}) // Generic array creation. + protected TiledProcess(final RenderedImage data, final int overlapX, final int overlapY, + final PixelIterator.Builder iteratorFactory) + { + ArgumentChecks.ensurePositive("overlapX", overlapX); + ArgumentChecks.ensurePositive("overlapY", overlapY); + /* + * Get a first estimation of the number of tiles. If the total number is greater than `PARALLELISM` + * (number of processors - 1), scale the number of tiles for having a total less than `PARALLELISM`. + */ + final int width = data.getWidth(); + final int height = data.getHeight(); + int numTileX = Math.max(width / MIN_TILE_SIZE, 1); + int numTileY = Math.max(height / MIN_TILE_SIZE, 1); + long numTiles = JDK9.multiplyFull(numTileX, numTileY); + if (numTiles > CommonExecutor.PARALLELISM) { + final double r = Math.sqrt(CommonExecutor.PARALLELISM / (double) numTiles); // Always < 1. + if (numTileX >= numTileY) { + numTileX = Math.max((int) Math.round(numTileX * r), 1); + numTileY = Math.max(Math.min(CommonExecutor.PARALLELISM / numTileX, numTileY), 1); + } else { + numTileY = Math.max((int) Math.round(numTileY * r), 1); + numTileX = Math.max(Math.min(CommonExecutor.PARALLELISM / numTileY, numTileX), 1); + } + } + yStride = numTileX; + tasks = new TiledProcess.Task[numTileX * numTileY]; // length ≤ CommonExecutor.PARALLELISM. + runningThreads = new AtomicInteger(tasks.length); + /* + * Prepare the pixel iterators for all sub-tasks, without starting those sub-tasks yet. + * The sub-tasks will be created and started by `execute()` after everything is ready. + */ + final int xmin = data.getMinX(); + final int ymin = data.getMinY(); + final int xmax = Math.addExact(xmin, width); + final int ymax = Math.addExact(ymin, height); + final int xinc = Numerics.ceilDiv(width, numTileX); + final int yinc = Numerics.ceilDiv(height, numTileY); + final Rectangle subArea = new Rectangle(Math.addExact(xinc, overlapX), + Math.addExact(yinc, overlapY)); + int count = 0; + iterators = new PixelIterator[tasks.length]; + for (subArea.y = ymin; subArea.y < ymax; subArea.y += yinc) { + for (subArea.x = xmin; subArea.x < xmax; subArea.x += xinc) { + iterators[count++] = iteratorFactory.setRegionOfInterest(subArea).create(data); + } + } + assert count == tasks.length; + } + + /** + * Starts execution of each sub-task in its own thread. + * + * @return a {@code Future} representing pending completion of the task. + */ + public final Future<R> execute() { + @SuppressWarnings({"rawtypes", "unchecked"}) + final Future<R>[] components = new Future[tasks.length]; + while (taskIndex < components.length) { + final Task task = createSubTask(); + components[taskIndex++] = CommonExecutor.instance().submit(task); + } + iterators = null; // Clear only after all tasks have been created. + return CompoundFuture.create(components); + } + + /** + * Creates a sub-task doing the computation on a sub-region of the image. + * This method is invoked by {@link #execute()} for each "tile" where the + * sub-task will be executed. Each sub-tasks will have its + * {@linkplain Task#iterator own pixel iterator}. + * + * @return a sub-task over a sub-region of the image to process. + */ + protected abstract Task createSubTask(); + + /** + * A task to be executed in {@link TiledProcess} for a sub-region of the image to process. + * + * <p>This class implements {@link Callable} for {@code TiledProcess} convenience. + * This implementation details should be ignored; it may change in any future version.</p> + */ + abstract class Task implements Callable<R> { + /** + * Index of this {@code Task} instance in the {@link TiledProcess#tasks} array. + */ + private final int index; + + /** + * Iterator over the pixels in the sub-region explored by this task. + */ + protected final PixelIterator iterator; + + /** + * Synchronization lock during the merge phase. Created only before merge attempts. + * This reference shall be considered final after initialization. + */ + private ReentrantLock merging; + + /** + * Creates a new sub-task to be executed in a sub-area of a two-dimensional plane. + */ + protected Task() { + index = taskIndex; + iterator = iterators[index]; + } + + /** + * Executes this sub-task. This method is invoked by {@link TiledProcess#execute()} + * in a background thread. Implementation should store the result in this {@code Task} + * instance for future {@linkplain #merge(Task) merge}. + * + * @throws Exception if an error occurred during sub-task execution. + */ + protected abstract void execute() throws Exception; + + /** + * Merges the result of given sub-task into this one. The given {@code Task} will be a neighbor tile + * (located on top, left, right or bottom of this tile) on a <em>best-effort</em> basis (no guarantee). + * After this method call, all data should be in {@code this} and the {@code neighbor} sub-task will be + * discarded. + * + * @param neighbor the other sub-task to merge with this one. + * @throws Exception if an error occurred during merge operation. + */ + protected abstract void merge(Task neighbor) throws Exception; + + /** + * Returns the computation result. This method is invoked by {@link TiledProcess} only once, + * on the last {@code Task} instance after all other instances have been {@linkplain #merge + * merged} on {@code this}. + * + * @return the computation result. + * @throws Exception if final result can not be computed. + */ + protected abstract R result() throws Exception; + + /** + * Executes this sub-task, then tries to merge it with all neighbor sub-tasks that are completed. + * This method is public as an implementation side-effect and should not be invoked directly; + * it will typically be invoked by {@link java.util.concurrent.Executor} instead. + * + * @return value computed by this sub-task, or {@code null} if this is not the last task. + * @throws Exception if an error occurred during sub-task execution or merge operation. + * + * @see java.util.concurrent.Executor#execute(Runnable) + */ + @Override + @SuppressWarnings("fallthrough") + public final R call() throws Exception { + execute(); + final Task[] tasks = TiledProcess.this.tasks; + final int yStride = TiledProcess.this.yStride; + final int rowStart = index - (index % yStride); + /* + * No lock can exist for this `Task` before this line because they were no reference to + * `this` in the `tasks` array. This reference will be added (potentially many times) + * after the lock for making sure that no other thread uses it before we finished. + */ + assert merging == null; + merging = new ReentrantLock(); + merging.lock(); + try { + synchronized (tasks) { + assert tasks[index] == null; + tasks[index] = this; + } + /* + * We will check the 4 neighbors (top, left, right, bottom) for another `Task` instance + * that we can merge with `this`. We may recheck those 4 neighbors many times depending + * how many we found in previous iteration: + * + * - If `count == 4` we are done and the loop is stopped. + * - If `count == 0` we found no neighbor and abandon. Note that neighbors may appear later + * if they were still under computation at the time this method is invoked. A final check + * for late additions will be done sequentially in the last thread. + * - For all other counts, we merged some but not all neighbors. Maybe new neighbors became + * available during the time we did the merge, so the 4 neighbors will be checked again. + */ + int count = 0; +retry: for (int side=0;; side++) { // Break condition is inside the loop. + final boolean valid; // Whether `i` index is inside bounds. + final int i; // Neighbor index in the `tasks` array. + switch (side) { + default: { + if ((count & 3) == 0) break retry; // `count` == 0 or 4. + count = 0; + side = 0; // Continue (fallthrough) with case 0. + } + case 0: i = index - yStride; valid = (i >= 0); break; // Neighbor on top. + case 1: i = index - 1; valid = (i >= rowStart); break; // Neighbor on left. + case 2: i = index + 1; valid = (i < rowStart + yStride); break; // Neighbor on right. + case 3: i = index + yStride; valid = (i < tasks.length); break; // Neighbor on bottom. + } + if (valid) { + /* + * Index was computed without synchronization, but access to `tasks` elements + * need to be synchronized. If we find a neighbor that we can merge, replace + * immediately all instances by `this` so that no other thread will merge it. + * The `this` instance will be ignored by other threads until this method exits + * because we hold the `this.merging` lock. + */ + final Task neighbor; + synchronized (tasks) { + neighbor = tasks[i]; + if (neighbor == null || neighbor == this || !neighbor.merging.tryLock()) { + continue; // Ignore (for now) this neighbor, check next neighbor. + } + for (int j=0; j<tasks.length; j++) { + if (tasks[j] == neighbor) { + tasks[j] = this; + } + } + } + /* + * Do the actual merge between sub-tasks while we hold the lock on the two + * `Task` instances. The `neighbor` reference is discarded after this block. + */ + try { + merge(neighbor); + } finally { + neighbor.merging.unlock(); + } + count++; + } + } + } finally { + merging.unlock(); + } + /* + * If this thread is the last one, check if there is any unmerged `Task` instances. + * There is no more synchronization at this point because no other thread is using + * those objects; this final check is monothread and sequential. + */ + if (runningThreads.decrementAndGet() != 0) { + return null; + } + for (int i=0; i<tasks.length; i++) { + final Task other = tasks[i]; + if (other != null && other != this) { + assert !other.merging.isLocked(); + for (int j=i; j<tasks.length; j++) { + if (tasks[j] == other) { + tasks[j] = this; + } + } + merge(other); + } + } + return result(); + } + } +}
