Repository: incubator-gobblin Updated Branches: refs/heads/master ae0ba2815 -> 8284bb76b
[GOBBLIN-264] Add a SharedResourceFactory for creating shared DataPub⦠Closes #2116 from htran1/shareable_publishers Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/8284bb76 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/8284bb76 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/8284bb76 Branch: refs/heads/master Commit: 8284bb76bac89c0e15186dbf75717e6ca831eab0 Parents: ae0ba28 Author: Hung Tran <[email protected]> Authored: Thu Sep 28 09:59:21 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Thu Sep 28 09:59:21 2017 -0700 ---------------------------------------------------------------------- .../apache/gobblin/capability/Capability.java | 40 ++++++ .../gobblin/capability/CapabilityAware.java | 38 ++++++ .../apache/gobblin/publisher/DataPublisher.java | 14 +- .../gobblin/publisher/DataPublisherFactory.java | 92 +++++++++++++ .../gobblin/publisher/DataPublisherKey.java | 64 +++++++++ .../publisher/DataPublisherFactoryTest.java | 134 +++++++++++++++++++ .../broker/ImmediatelyInvalidResourceEntry.java | 56 ++++++++ .../apache/gobblin/broker/ResourceInstance.java | 15 ++- .../org/apache/gobblin/util/ParallelRunner.java | 80 ++++++----- 9 files changed, 501 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8284bb76/gobblin-api/src/main/java/org/apache/gobblin/capability/Capability.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/capability/Capability.java b/gobblin-api/src/main/java/org/apache/gobblin/capability/Capability.java new file mode 100644 index 0000000..02ee89a --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/capability/Capability.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.capability; + +import org.apache.gobblin.annotation.Alpha; + +import lombok.Data; + +/** + * Represents a set of functionality a job-creator can ask for. Examples could include + * encryption, compression, partitioning... + * + * Each Capability has a name and then a set of associated configuration properties. An example is + * the encryption algorithm to use. + */ +@Alpha +@Data +public class Capability { + /** + * Threadsafe capability. + */ + public static final Capability THREADSAFE = new Capability("THREADSAFE", false); + + private final String name; + private final boolean critical; +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8284bb76/gobblin-api/src/main/java/org/apache/gobblin/capability/CapabilityAware.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/capability/CapabilityAware.java b/gobblin-api/src/main/java/org/apache/gobblin/capability/CapabilityAware.java new file mode 100644 index 0000000..2b56469 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/capability/CapabilityAware.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.capability; + +import java.util.Map; + +import org.apache.gobblin.annotation.Alpha; + +/** + * Describes an object that is aware of the capabilities it supports. + */ +@Alpha +public interface CapabilityAware { + /** + * Checks if this object supports the given Capability with the given properties. + * + * Implementers of this should always check if their super-class may happen to support a capability + * before returning false! + * @param c Capability being queried + * @param properties Properties specific to the capability. Properties are capability specific. + * @return True if this object supports the given capability + property settings, false if not + */ + boolean supportsCapability(Capability c, Map<String, Object> properties); +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8284bb76/gobblin-api/src/main/java/org/apache/gobblin/publisher/DataPublisher.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/publisher/DataPublisher.java b/gobblin-api/src/main/java/org/apache/gobblin/publisher/DataPublisher.java index a190d4a..4a551df 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/publisher/DataPublisher.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/publisher/DataPublisher.java @@ -21,7 +21,10 @@ import java.io.Closeable; import java.io.IOException; import java.lang.reflect.Constructor; import java.util.Collection; +import java.util.Map; +import org.apache.gobblin.capability.Capability; +import org.apache.gobblin.capability.CapabilityAware; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; @@ -30,7 +33,11 @@ import org.apache.gobblin.configuration.WorkUnitState; /** * Defines how to publish data and its corresponding metadata. Can be used for either task level or job level publishing. */ -public abstract class DataPublisher implements Closeable { +public abstract class DataPublisher implements Closeable, CapabilityAware { + /** + * Reusable capability. + */ + public static final Capability REUSABLE = new Capability("REUSABLE", false); protected final State state; @@ -125,4 +132,9 @@ public abstract class DataPublisher implements Closeable { protected boolean shouldPublishMetadataFirst() { return true; } + + @Override + public boolean supportsCapability(Capability c, Map<String, Object> properties) { + return false; + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8284bb76/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java new file mode 100644 index 0000000..4e565ad --- /dev/null +++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.publisher; + +import java.io.IOException; +import java.util.Collections; + +import org.apache.gobblin.broker.ImmediatelyInvalidResourceEntry; +import org.apache.gobblin.broker.ResourceInstance; +import org.apache.gobblin.broker.iface.ConfigView; +import org.apache.gobblin.broker.iface.NotConfiguredException; +import org.apache.gobblin.broker.iface.ScopeType; +import org.apache.gobblin.broker.iface.ScopedConfigView; +import org.apache.gobblin.broker.iface.SharedResourceFactory; +import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; +import org.apache.gobblin.capability.Capability; +import org.apache.gobblin.configuration.State; + +import lombok.extern.slf4j.Slf4j; + +/** + * A {@link SharedResourceFactory} for creating {@link DataPublisher}s. + * + * The factory creates a {@link DataPublisher} with the publisher class name and state. + */ +@Slf4j +public class DataPublisherFactory<S extends ScopeType<S>> + implements SharedResourceFactory<DataPublisher, DataPublisherKey, S> { + + public static final String FACTORY_NAME = "dataPublisher"; + + public static <S extends ScopeType<S>> DataPublisher get(String publisherClassName, State state, + SharedResourcesBroker<S> broker) throws IOException { + try { + return broker.getSharedResource(new DataPublisherFactory<S>(), new DataPublisherKey(publisherClassName, state)); + } catch (NotConfiguredException nce) { + throw new IOException(nce); + } + } + + @Override + public String getName() { + return FACTORY_NAME; + } + + @Override + public SharedResourceFactoryResponse<DataPublisher> createResource(SharedResourcesBroker<S> broker, + ScopedConfigView<S, DataPublisherKey> config) throws NotConfiguredException { + try { + DataPublisherKey key = config.getKey(); + String publisherClassName = key.getPublisherClassName(); + State state = key.getState(); + Class<? extends DataPublisher> dataPublisherClass = (Class<? extends DataPublisher>) Class + .forName(publisherClassName); + + DataPublisher publisher = DataPublisher.getInstance(dataPublisherClass, state); + + // If the publisher is threadsafe then it is shareable, so return it as a resource instance that may be cached + // by the broker. + // Otherwise, it is not shareable, so return it as an immediately invalidated resource that will only be returned + // once from the broker. + if (publisher.supportsCapability(Capability.THREADSAFE, Collections.EMPTY_MAP)) { + return new ResourceInstance<>(publisher); + } else { + return new ImmediatelyInvalidResourceEntry<>(publisher); + } + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + + @Override + public S getAutoScope(SharedResourcesBroker<S> broker, ConfigView<S, DataPublisherKey> config) { + return broker.selfScope().getType().rootScope(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8284bb76/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherKey.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherKey.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherKey.java new file mode 100644 index 0000000..fca1ba1 --- /dev/null +++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherKey.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.publisher; + +import org.apache.gobblin.broker.iface.SharedResourceKey; +import org.apache.gobblin.configuration.State; + +import lombok.Getter; + + +/** + * {@link SharedResourceKey} for requesting {@link DataPublisher}s from a + * {@link org.apache.gobblin.broker.iface.SharedResourceFactory + */ +@Getter +public class DataPublisherKey implements SharedResourceKey { + private final String publisherClassName; + private final State state; + + public DataPublisherKey(String publisherClassName, State state) { + this.publisherClassName = publisherClassName; + this.state = state; + } + + @Override + public String toConfigurationKey() { + return this.publisherClassName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DataPublisherKey that = (DataPublisherKey) o; + + return publisherClassName == null ? + that.publisherClassName == null : publisherClassName.equals(that.publisherClassName); + } + + @Override + public int hashCode() { + return publisherClassName != null ? publisherClassName.hashCode() : 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8284bb76/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java b/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java new file mode 100644 index 0000000..b2cd739 --- /dev/null +++ b/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.publisher; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.broker.SimpleScope; +import org.apache.gobblin.broker.SimpleScopeType; +import org.apache.gobblin.broker.iface.NoSuchScopeException; +import org.apache.gobblin.broker.iface.NotConfiguredException; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; +import org.apache.gobblin.capability.Capability; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.configuration.WorkUnitState; + +/** + * Tests for DataPublisherFactory + */ +public class DataPublisherFactoryTest { + + @Test + public void testGetNonThreadSafePublisher() + throws IOException { + SharedResourcesBroker broker = + SharedResourcesBrokerFactory.<SimpleScopeType>createDefaultTopLevelBroker(ConfigFactory.empty(), + SimpleScopeType.GLOBAL.defaultScopeInstance()); + + DataPublisher publisher1 = DataPublisherFactory.get(TestNonThreadsafeDataPublisher.class.getName(), null, broker); + DataPublisher publisher2 = DataPublisherFactory.get(TestNonThreadsafeDataPublisher.class.getName(), null, broker); + + // should get different publishers + Assert.assertNotEquals(publisher1, publisher2); + + // Check capabilities + Assert.assertTrue(publisher1.supportsCapability(DataPublisher.REUSABLE, Collections.EMPTY_MAP)); + Assert.assertFalse(publisher1.supportsCapability(Capability.THREADSAFE, Collections.EMPTY_MAP)); + } + + @Test + public void testGetThreadSafePublisher() + throws IOException, NotConfiguredException, NoSuchScopeException { + SharedResourcesBroker<SimpleScopeType> broker = + SharedResourcesBrokerFactory.<SimpleScopeType>createDefaultTopLevelBroker(ConfigFactory.empty(), + SimpleScopeType.GLOBAL.defaultScopeInstance()); + + SharedResourcesBroker<SimpleScopeType> localBroker1 = + broker.newSubscopedBuilder(new SimpleScope<>(SimpleScopeType.LOCAL, "local1")).build(); + + DataPublisher publisher1 = DataPublisherFactory.get(TestThreadsafeDataPublisher.class.getName(), null, broker); + DataPublisher publisher2 = DataPublisherFactory.get(TestThreadsafeDataPublisher.class.getName(), null, broker); + + // should get the same publisher + Assert.assertEquals(publisher1, publisher2); + + DataPublisher publisher3 = + localBroker1.getSharedResource(new DataPublisherFactory<>(), + new DataPublisherKey(TestThreadsafeDataPublisher.class.getName(), null)); + + // should get the same publisher + Assert.assertEquals(publisher2, publisher3); + + DataPublisher publisher4 = + localBroker1.getSharedResourceAtScope(new DataPublisherFactory<>(), + new DataPublisherKey(TestThreadsafeDataPublisher.class.getName(), null), SimpleScopeType.LOCAL); + + // should get a different publisher + Assert.assertNotEquals(publisher3, publisher4); + + // Check capabilities + Assert.assertTrue(publisher1.supportsCapability(DataPublisher.REUSABLE, Collections.EMPTY_MAP)); + Assert.assertTrue(publisher1.supportsCapability(Capability.THREADSAFE, Collections.EMPTY_MAP)); + } + + private static class TestNonThreadsafeDataPublisher extends DataPublisher { + public TestNonThreadsafeDataPublisher(State state) { + super(state); + } + + @Override + public void initialize() throws IOException { + } + + @Override + public void publishData(Collection<? extends WorkUnitState> states) throws IOException { + } + + @Override + public void publishMetadata(Collection<? extends WorkUnitState> states) throws IOException { + } + + @Override + public void close() throws IOException { + } + + @Override + public boolean supportsCapability(Capability c, Map<String, Object> properties) { + return c == DataPublisher.REUSABLE; + } + } + + private static class TestThreadsafeDataPublisher extends TestNonThreadsafeDataPublisher { + public TestThreadsafeDataPublisher(State state) { + super(state); + } + + @Override + public boolean supportsCapability(Capability c, Map<String, Object> properties) { + return (c == Capability.THREADSAFE || c == DataPublisher.REUSABLE); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8284bb76/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java b/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java new file mode 100644 index 0000000..b3f8502 --- /dev/null +++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.broker; + +import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; + + +/** + * A {@link ResourceEntry} that expires immediately. The resource is not closed on invalidation since the lifetime of + * the object cannot be determined by the cache, so the recipient of the resource needs to close it. + */ +@Slf4j +@EqualsAndHashCode(callSuper = true) +public class ImmediatelyInvalidResourceEntry<T> extends ResourceInstance<T> { + private boolean valid; + + public ImmediatelyInvalidResourceEntry(T resource) { + super(resource); + this.valid = true; + } + + @Override + public T getResource() { + // mark the object as invalid before returning so that a new one will be created on the next + // request from the factory + this.valid = false; + + return super.getResource(); + } + + @Override + public boolean isValid() { + return this.valid; + } + + @Override + public void onInvalidate() { + // these type of resource cannot be closed on invalidation since the lifetime can't be determined + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8284bb76/gobblin-utility/src/main/java/org/apache/gobblin/broker/ResourceInstance.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/ResourceInstance.java b/gobblin-utility/src/main/java/org/apache/gobblin/broker/ResourceInstance.java index e0f8611..b5f04c3 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/broker/ResourceInstance.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/ResourceInstance.java @@ -26,7 +26,20 @@ import lombok.Data; */ @Data public class ResourceInstance<T> implements ResourceEntry<T> { - private final T resource; + // Note: the name here is theResource instead of resource since to avoid a collision of the lombok generated getter + // and the getResource() method defined in {@link ResourceEntry}. The collision results in unintended side effects + // when getResource() is overridden since it may have additional logic that should not be executed when the value of + // this field is fetched using the getter, such as in the Lombok generated toString(). + private final T theResource; + + /** + * This method returns the resource, but may have logic before the return. + * @return the resource + */ + @Override + public T getResource() { + return getTheResource(); + } @Override public boolean isValid() { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8284bb76/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java index c112d5b..e2f4964 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java @@ -18,6 +18,8 @@ package org.apache.gobblin.util; import lombok.Data; +import lombok.Getter; +import lombok.Setter; import java.io.Closeable; import java.io.IOException; @@ -83,7 +85,14 @@ public class ParallelRunner implements Closeable { public static final int DEFAULT_PARALLEL_RUNNER_THREADS = 10; private final ExecutorService executor; - private final FileSystem fs; + + /** + * Setting of fs is allowed to support reusing the {@link ParallelRunner} with different {@link FileSystem}s + * after all tasks have completed execution. + */ + @Getter + @Setter + private FileSystem fs; private final List<NamedFuture> futures = Lists.newArrayList(); @@ -341,38 +350,49 @@ public class ParallelRunner implements Closeable { this.futures.add(new NamedFuture(this.executor.submit(callable), name)); } - @Override - public void close() throws IOException { + /** + * Wait for all submitted tasks to complete. The {@link ParallelRunner} can be reused after this call. + * @throws IOException + */ + public void waitForTasks() throws IOException { // Wait for all submitted tasks to complete - try { - boolean wasInterrupted = false; - IOException exception = null; - for (NamedFuture future : this.futures) { - try { - if (wasInterrupted) { - future.getFuture().cancel(true); - } else { - future.getFuture().get(); - } - } catch (InterruptedException ie) { - LOGGER.warn("Task was interrupted: " + future.getName()); - wasInterrupted = true; - if (exception == null) { - exception = new IOException(ie); - } - } catch (ExecutionException ee) { - LOGGER.warn("Task failed: " + future.getName(), ee.getCause()); - if (exception == null) { - exception = new IOException(ee.getCause()); - } + boolean wasInterrupted = false; + IOException exception = null; + for (NamedFuture future : this.futures) { + try { + if (wasInterrupted) { + future.getFuture().cancel(true); + } else { + future.getFuture().get(); + } + } catch (InterruptedException ie) { + LOGGER.warn("Task was interrupted: " + future.getName()); + wasInterrupted = true; + if (exception == null) { + exception = new IOException(ie); + } + } catch (ExecutionException ee) { + LOGGER.warn("Task failed: " + future.getName(), ee.getCause()); + if (exception == null) { + exception = new IOException(ee.getCause()); } } - if (wasInterrupted) { - Thread.currentThread().interrupt(); - } - if (exception != null && this.failPolicy == FailPolicy.FAIL_ONE_FAIL_ALL) { - throw exception; - } + } + if (wasInterrupted) { + Thread.currentThread().interrupt(); + } + if (exception != null && this.failPolicy == FailPolicy.FAIL_ONE_FAIL_ALL) { + throw exception; + } + + // clear so that more tasks can be submitted to this ParallelRunner + futures.clear(); + } + + @Override + public void close() throws IOException { + try { + waitForTasks(); } finally { ExecutorsUtils.shutdownExecutorService(this.executor, Optional.of(LOGGER)); }
