This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a68dc3c440c79c031d77d0082b7f8e4707311382 Author: Piotr Nowojski <[email protected]> AuthorDate: Fri Jul 5 16:50:57 2019 +0200 [hotfix][test] Guarantee order of CloseableRegistry --- .../java/org/apache/flink/core/fs/CloseableRegistry.java | 16 ++++++++++++++-- .../org/apache/flink/util/AbstractCloseableRegistry.java | 8 ++++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java index 5f1c9fb..07e8b5b 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java @@ -24,7 +24,10 @@ import org.apache.flink.util.AbstractCloseableRegistry; import javax.annotation.Nonnull; import java.io.Closeable; -import java.util.HashMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; import java.util.Map; /** @@ -33,6 +36,8 @@ import java.util.Map; * <p>Registering to an already closed registry will throw an exception and close the provided {@link Closeable} * * <p>All methods in this class are thread-safe. + * + * <p>This class closes all registered {@link Closeable}s in the reverse registration order. */ @Internal public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Object> { @@ -40,7 +45,7 @@ public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Obje private static final Object DUMMY = new Object(); public CloseableRegistry() { - super(new HashMap<>()); + super(new LinkedHashMap<>()); } @Override @@ -52,4 +57,11 @@ public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Obje protected boolean doUnRegister(@Nonnull Closeable closeable, @Nonnull Map<Closeable, Object> closeableMap) { return closeableMap.remove(closeable) != null; } + + @Override + protected Collection<Closeable> getReferencesToClose() { + ArrayList<Closeable> closeablesToClose = new ArrayList<>(closeableToRef.keySet()); + Collections.reverse(closeablesToClose); + return closeablesToClose; + } } diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java index e6589f6..ff4febd 100644 --- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java @@ -49,7 +49,7 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen /** Map from tracked Closeables to some associated meta data. */ @GuardedBy("lock") - private final Map<Closeable, T> closeableToRef; + protected final Map<Closeable, T> closeableToRef; /** Indicates if this registry is closed. */ @GuardedBy("lock") @@ -114,7 +114,7 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen closed = true; - toCloseCopy = new ArrayList<>(closeableToRef.keySet()); + toCloseCopy = getReferencesToClose(); closeableToRef.clear(); } @@ -128,6 +128,10 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen } } + protected Collection<Closeable> getReferencesToClose() { + return new ArrayList<>(closeableToRef.keySet()); + } + /** * Does the actual registration of the closeable with the registry map. This should not do any long running or * potentially blocking operations as is is executed under the registry's lock.
