[GOBBLIN-391] Use the DataPublisherFactory to allow sharing publisherâ¦
Closes #2267 from htran1/share_publisher Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/378ccaa8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/378ccaa8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/378ccaa8 Branch: refs/heads/0.12.0 Commit: 378ccaa8a253a1eda873ffbe74300c6bf8a755e8 Parents: 41fd2b9 Author: Hung Tran <[email protected]> Authored: Fri Jan 26 11:45:31 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Fri Jan 26 11:45:31 2018 -0800 ---------------------------------------------------------------------- .../gobblin/publisher/DataPublisherFactory.java | 12 ++- .../gobblin/runtime/SafeDatasetCommit.java | 27 +++-- .../runtime/mapreduce/MRJobLauncherTest.java | 103 +++++++++++++++++++ 3 files changed, 133 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/378ccaa8/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java index 4e565ad..8d77fd6 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java @@ -54,6 +54,16 @@ public class DataPublisherFactory<S extends ScopeType<S>> } } + /** + * Is the publisher cacheable in the SharedResourcesBroker? + * @param publisher + * @return true if cacheable, else false + */ + public static boolean isPublisherCacheable(DataPublisher publisher) { + // only threadsafe publishers are cacheable. non-threadsafe publishers are marked immediately for invalidation + return publisher.supportsCapability(Capability.THREADSAFE, Collections.EMPTY_MAP); + } + @Override public String getName() { return FACTORY_NAME; @@ -75,7 +85,7 @@ public class DataPublisherFactory<S extends ScopeType<S>> // by the broker. // Otherwise, it is not shareable, so return it as an immediately invalidated resource that will only be returned // once from the broker. - if (publisher.supportsCapability(Capability.THREADSAFE, Collections.EMPTY_MAP)) { + if (isPublisherCacheable(publisher)) { return new ResourceInstance<>(publisher); } else { return new ImmediatelyInvalidResourceEntry<>(publisher); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/378ccaa8/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java index 43e5c59..7e2e9fa 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java @@ -44,6 +44,7 @@ import org.apache.gobblin.metrics.event.FailureEventBuilder; import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.publisher.CommitSequencePublisher; import org.apache.gobblin.publisher.DataPublisher; +import org.apache.gobblin.publisher.DataPublisherFactory; import org.apache.gobblin.publisher.UnpublishedHandling; import org.apache.gobblin.runtime.commit.DatasetStateCommitStep; import org.apache.gobblin.runtime.task.TaskFactory; @@ -133,9 +134,23 @@ final class SafeDatasetCommit implements Callable<Void> { } generateCommitSequenceBuilder(this.datasetState, entry.getValue()); } else { - DataPublisher publisher = taskFactory == null ? closer - .register(DataPublisher.getInstance(dataPublisherClass, this.jobContext.getJobState())) - : taskFactory.createDataPublisher(this.datasetState); + DataPublisher publisher; + + if (taskFactory == null) { + publisher = DataPublisherFactory.get(dataPublisherClass.getName(), this.jobContext.getJobState(), + this.jobContext.getJobBroker()); + + // non-threadsafe publishers are not shareable and are not retained in the broker, so register them with + // the closer + if (!DataPublisherFactory.isPublisherCacheable(publisher)) { + closer.register(publisher); + } + } else { + // NOTE: sharing of publishers is not supported when they are instantiated through the TaskFactory. + // This should be revisited if sharing is required. + publisher = taskFactory.createDataPublisher(this.datasetState); + } + if (this.isJobCancelled) { if (publisher.canBeSkipped()) { log.warn(publisher.getClass() + " will be skipped."); @@ -160,11 +175,7 @@ final class SafeDatasetCommit implements Callable<Void> { this.datasetState.setState(JobState.RunningState.COMMITTED); } } - } catch (ReflectiveOperationException roe) { - log.error(String.format("Failed to instantiate data publisher for dataset %s of job %s.", this.datasetUrn, - this.jobContext.getJobId()), roe); - throw new RuntimeException(roe); - } catch (Throwable throwable) { + } catch (Throwable throwable) { log.error(String.format("Failed to commit dataset state for dataset %s of job %s", this.datasetUrn, this.jobContext.getJobId()), throwable); throw new RuntimeException(throwable); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/378ccaa8/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java index 8d4f308..e8e996e 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java @@ -20,7 +20,10 @@ package org.apache.gobblin.runtime.mapreduce; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.util.Collection; +import java.util.Map; import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; import org.jboss.byteman.contrib.bmunit.BMNGRunner; @@ -37,14 +40,17 @@ import com.google.common.collect.ImmutableMap; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.gobblin.capability.Capability; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.DynamicConfigGenerator; +import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.metastore.FsStateStore; import org.apache.gobblin.metastore.StateStore; import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory; import org.apache.gobblin.metrics.GobblinMetrics; +import org.apache.gobblin.publisher.DataPublisher; import org.apache.gobblin.runtime.JobLauncherTestHelper; import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.util.limiter.BaseLimiterType; @@ -360,6 +366,55 @@ public class MRJobLauncherTest extends BMNGRunner { } } + @Test + public void testLaunchJobWithNonThreadsafeDataPublisher() throws Exception { + final Logger log = LoggerFactory.getLogger(getClass().getName() + ".testLaunchJobWithNonThreadsafeDataPublisher"); + log.info("in"); + Properties jobProps = loadJobProps(); + jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + "-testLaunchJobWithNonThreadsafeDataPublisher"); + jobProps.setProperty(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE, TestNonThreadsafeDataPublisher.class.getName()); + + // make sure the count starts from 0 + TestNonThreadsafeDataPublisher.instantiatedCount.set(0); + + try { + this.jobLauncherTestHelper.runTestWithMultipleDatasets(jobProps); + } finally { + this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)); + } + + // A different publisher is used for each dataset + Assert.assertEquals(TestNonThreadsafeDataPublisher.instantiatedCount.get(), 4); + + log.info("out"); + } + + @Test + public void testLaunchJobWithThreadsafeDataPublisher() throws Exception { + final Logger log = LoggerFactory.getLogger(getClass().getName() + ".testLaunchJobWithThreadsafeDataPublisher"); + log.info("in"); + Properties jobProps = loadJobProps(); + jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + "-testLaunchJobWithThreadsafeDataPublisher"); + jobProps.setProperty(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE, TestThreadsafeDataPublisher.class.getName()); + + // make sure the count starts from 0 + TestThreadsafeDataPublisher.instantiatedCount.set(0); + + try { + this.jobLauncherTestHelper.runTestWithMultipleDatasets(jobProps); + } finally { + this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)); + } + + // The same publisher is used for all the data sets + Assert.assertEquals(TestThreadsafeDataPublisher.instantiatedCount.get(), 1); + + log.info("out"); + } + + @AfterClass(alwaysRun = true) public void tearDown() throws IOException { if (testMetastoreDatabase != null) { @@ -390,4 +445,52 @@ public class MRJobLauncherTest extends BMNGRunner { JobLauncherTestHelper.DYNAMIC_VALUE1)); } } + + public static class TestNonThreadsafeDataPublisher extends DataPublisher { + // for counting how many times the object is instantiated in the test case + static AtomicInteger instantiatedCount = new AtomicInteger(0); + + public TestNonThreadsafeDataPublisher(State state) { + super(state); + instantiatedCount.incrementAndGet(); + } + + @Override + public void initialize() throws IOException { + } + + @Override + public void publishData(Collection<? extends WorkUnitState> states) throws IOException { + for (WorkUnitState workUnitState : states) { + // Upon successfully committing the data to the final output directory, set states + // of successful tasks to COMMITTED. leaving states of unsuccessful ones unchanged. + // This makes sense to the COMMIT_ON_PARTIAL_SUCCESS policy. + workUnitState.setWorkingState(WorkUnitState.WorkingState.COMMITTED); + } + } + + @Override + public void publishMetadata(Collection<? extends WorkUnitState> states) throws IOException { + } + + @Override + public void close() throws IOException { + } + + @Override + public boolean supportsCapability(Capability c, Map<String, Object> properties) { + return c == DataPublisher.REUSABLE; + } + } + + public static class TestThreadsafeDataPublisher extends TestNonThreadsafeDataPublisher { + public TestThreadsafeDataPublisher(State state) { + super(state); + } + + @Override + public boolean supportsCapability(Capability c, Map<String, Object> properties) { + return (c == Capability.THREADSAFE || c == DataPublisher.REUSABLE); + } + } }
