[GOBBLIN-413] Use same compaction start time for time lookback check during compaction
Closes #2289 from yukuai518/compacttime Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a7a85e15 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a7a85e15 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a7a85e15 Branch: refs/heads/0.12.0 Commit: a7a85e150474b8911b0b92114781a30105b77822 Parents: a3189d7 Author: Kuai Yu <[email protected]> Authored: Wed Feb 21 14:14:48 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Wed Feb 21 14:14:48 2018 -0800 ---------------------------------------------------------------------- .../compaction/source/CompactionSource.java | 5 +- .../verify/CompactionTimeRangeVerifier.java | 7 +- .../gobblin/azkaban/AzkabanJobLauncher.java | 24 +++- .../runtime/listeners/CompositeJobListener.java | 133 +++++++++++++++++++ .../listeners/EmailNotificationJobListener.java | 2 + 5 files changed, 166 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7a85e15/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java index 4e8d3e0..f11378f 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java @@ -75,6 +75,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.joda.time.DateTimeUtils; import java.io.IOException; import java.net.URI; @@ -94,6 +95,7 @@ import java.util.concurrent.atomic.AtomicBoolean; */ @Slf4j public class CompactionSource implements WorkUnitStreamSource<String, String> { + public static final String COMPACTION_INIT_TIME = "compaction.init.time"; private CompactionSuite suite; private Path tmpJobDir; private FileSystem fs; @@ -108,6 +110,7 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> { public WorkUnitStream getWorkunitStream(SourceState state) { try { fs = getSourceFileSystem(state); + state.setProp(COMPACTION_INIT_TIME, DateTimeUtils.currentTimeMillis()); suite = CompactionSuiteUtils.getCompactionSuiteFactory(state).createSuite(state); initRequestAllocator(state); @@ -433,7 +436,7 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> { } } - protected FileSystem getSourceFileSystem(State state) + public static FileSystem getSourceFileSystem(State state) throws IOException { Configuration conf = HadoopUtils.getConfFromState(state); String uri = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, ConfigurationKeys.LOCAL_FS_URI); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7a85e15/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java index 85eca40..a267ab5 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java @@ -21,6 +21,7 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder; import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.compaction.parser.CompactionPathParser; +import org.apache.gobblin.compaction.source.CompactionSource; import org.apache.gobblin.configuration.State; import org.apache.gobblin.dataset.FileSystemDataset; import lombok.AllArgsConstructor; @@ -50,19 +51,19 @@ public class CompactionTimeRangeVerifier implements CompactionVerifier<FileSyste CompactionPathParser.CompactionParserResult result = new CompactionPathParser(state).parse(dataset); DateTime folderTime = result.getTime(); DateTimeZone timeZone = DateTimeZone.forID(this.state.getProp(MRCompactor.COMPACTION_TIMEZONE, MRCompactor.DEFAULT_COMPACTION_TIMEZONE)); - DateTime current = new DateTime(timeZone); + DateTime compactionStartTime = new DateTime(this.state.getPropAsLong(CompactionSource.COMPACTION_INIT_TIME), timeZone); PeriodFormatter formatter = new PeriodFormatterBuilder().appendMonths().appendSuffix("m").appendDays().appendSuffix("d").appendHours() .appendSuffix("h").toFormatter(); // get earliest time String maxTimeAgoStr = this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO, TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO); Period maxTimeAgo = formatter.parsePeriod(maxTimeAgoStr); - earliest = current.minus(maxTimeAgo); + earliest = compactionStartTime.minus(maxTimeAgo); // get latest time String minTimeAgoStr = this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO, TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO); Period minTimeAgo = formatter.parsePeriod(minTimeAgoStr); - latest = current.minus(minTimeAgo); + latest = compactionStartTime.minus(minTimeAgo); if (earliest.isBefore(folderTime) && latest.isAfter(folderTime)) { log.debug("{} falls in the user defined time range", dataset.datasetRoot()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7a85e15/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java index 45b2f40..2a7d311 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java @@ -34,6 +34,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.gobblin.runtime.job_catalog.PackagedTemplatesJobCatalogDecorator; +import org.apache.gobblin.runtime.listeners.CompositeJobListener; +import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; @@ -98,6 +100,7 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch private static final Logger LOG = Logger.getLogger(AzkabanJobLauncher.class); public static final String GOBBLIN_LOG_LEVEL_KEY = "gobblin.log.levelOverride"; + public static final String GOBBLIN_CUSTOM_JOB_LISTENERS = "gobblin.custom.job.listeners"; public static final String TEMPLATE_KEY = "gobblin.template.uri"; private static final String HADOOP_FS_DEFAULT_NAME = "fs.default.name"; @@ -115,7 +118,8 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch private final Closer closer = Closer.create(); private final JobLauncher jobLauncher; - private final JobListener jobListener = new EmailNotificationJobListener(); + private final JobListener jobListener; + private final Properties props; private final ApplicationLauncher applicationLauncher; private final long ownAzkabanSla; @@ -134,6 +138,9 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch this.props = new Properties(); this.props.putAll(props); + // initialize job listeners after properties has been initialized + this.jobListener = initJobListener(); + // load dynamic configuration and add them to the job properties Config propsAsConfig = ConfigUtils.propertiesToConfig(props); DynamicConfigGenerator dynamicConfigGenerator = @@ -217,6 +224,21 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch this.closer.register(new ServiceBasedAppLauncher(jobProps, "Azkaban-" + UUID.randomUUID())); } + private JobListener initJobListener() { + CompositeJobListener compositeJobListener = new CompositeJobListener(); + List<String> listeners = new State(props).getPropAsList(GOBBLIN_CUSTOM_JOB_LISTENERS, EmailNotificationJobListener.class.getSimpleName()); + try { + for (String listenerAlias: listeners) { + ClassAliasResolver<JobListener> conditionClassAliasResolver = new ClassAliasResolver<>(JobListener.class); + compositeJobListener.addJobListener(conditionClassAliasResolver.resolveClass(listenerAlias).newInstance()); + } + } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) { + throw new IllegalArgumentException(e); + } + + return compositeJobListener; + } + @Override public void run() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7a85e15/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/CompositeJobListener.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/CompositeJobListener.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/CompositeJobListener.java new file mode 100644 index 0000000..bfdc4c8 --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/CompositeJobListener.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.listeners; + +import java.util.List; + +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.gobblin.runtime.JobContext; + +import com.google.common.collect.Lists; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +@AllArgsConstructor +public class CompositeJobListener extends AbstractJobListener { + private List<JobListener> listeners = Lists.newArrayList(); + + public CompositeJobListener() { + } + + public void addJobListener(JobListener listener) { + this.listeners.add(listener); + } + + + @Override + public void onJobPrepare(JobContext jobContext) throws Exception { + StringBuffer buf = new StringBuffer(); + for (JobListener listener: listeners) { + try { + listener.onJobPrepare(jobContext); + } catch (Exception e) { + buf.append(listener.getClass().getName() + ":" + e.toString()); + log.error(ExceptionUtils.getFullStackTrace(e)); + } + } + + String exceptions = buf.toString(); + if (!exceptions.isEmpty()) { + throw new RuntimeException(exceptions); + } + } + + @Override + public void onJobStart(JobContext jobContext) throws Exception { + StringBuffer buf = new StringBuffer(); + for (JobListener listener: listeners) { + try { + listener.onJobStart(jobContext); + } catch (Exception e) { + buf.append(listener.getClass().getName() + ":" + e.toString()); + log.error(ExceptionUtils.getFullStackTrace(e)); + } + } + + String exceptions = buf.toString(); + if (!exceptions.isEmpty()) { + throw new RuntimeException(exceptions); + } + } + + @Override + public void onJobCompletion(JobContext jobContext) throws Exception { + StringBuffer buf = new StringBuffer(); + for (JobListener listener: listeners) { + try { + listener.onJobCompletion(jobContext); + } catch (Exception e) { + buf.append(listener.getClass().getName() + ":" + e.toString()); + log.error(ExceptionUtils.getFullStackTrace(e)); + } + } + + String exceptions = buf.toString(); + if (!exceptions.isEmpty()) { + throw new RuntimeException(exceptions); + } + } + + @Override + public void onJobCancellation(JobContext jobContext) throws Exception { + StringBuffer buf = new StringBuffer(); + for (JobListener listener: listeners) { + try { + listener.onJobCancellation(jobContext); + } catch (Exception e) { + buf.append(listener.getClass().getName() + ":" + e.toString()); + log.error(ExceptionUtils.getFullStackTrace(e)); + } + } + + String exceptions = buf.toString(); + if (!exceptions.isEmpty()) { + throw new RuntimeException(exceptions); + } + } + + @Override + public void onJobFailure(JobContext jobContext) throws Exception { + StringBuffer buf = new StringBuffer(); + for (JobListener listener: listeners) { + try { + listener.onJobFailure(jobContext); + } catch (Exception e) { + buf.append(listener.getClass().getName() + ":" + e.toString()); + log.error(ExceptionUtils.getFullStackTrace(e)); + } + } + + String exceptions = buf.toString(); + if (!exceptions.isEmpty()) { + throw new RuntimeException(exceptions); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7a85e15/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java index 3106f4d..feb0d92 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java @@ -18,6 +18,7 @@ package org.apache.gobblin.runtime.listeners; import org.apache.commons.mail.EmailException; +import org.apache.gobblin.annotation.Alias; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +33,7 @@ import org.apache.gobblin.util.EmailUtils; * * @author Yinan Li */ +@Alias("EmailNotificationJobListener") public class EmailNotificationJobListener extends AbstractJobListener { private static final Logger LOGGER = LoggerFactory.getLogger(EmailNotificationJobListener.class);
