Repository: incubator-gobblin Updated Branches: refs/heads/master d323f6022 -> 5b8f7dfb1
[GOBBLIN-423] Add record count limit to salesforce source Closes #2300 from yukuai518/limit Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5b8f7dfb Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5b8f7dfb Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5b8f7dfb Branch: refs/heads/master Commit: 5b8f7dfb10fa8eddaedfbb13df6adcaf018acf65 Parents: d323f60 Author: Kuai Yu <[email protected]> Authored: Thu Mar 22 16:47:59 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Thu Mar 22 16:47:59 2018 -0700 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 7 +- .../java/org/apache/gobblin/source/Source.java | 9 ++ .../apache/gobblin/cluster/GobblinHelixJob.java | 16 +-- .../cluster/GobblinHelixJobScheduler.java | 119 +++++++++++++++---- .../extractor/extract/AbstractSource.java | 5 + .../source/extractor/extract/ExtractType.java | 8 +- .../extractor/extract/QueryBasedSource.java | 2 +- .../source/extractor/partition/Partitioner.java | 28 +++-- .../source/extractor/watermark/Watermark.java | 12 +- .../gobblin/runtime/AbstractJobLauncher.java | 4 + .../org/apache/gobblin/runtime/JobLauncher.java | 4 + .../apache/gobblin/runtime/SourceDecorator.java | 5 + .../apache/gobblin/scheduler/JobScheduler.java | 39 ++++-- .../gobblin/salesforce/SalesforceSource.java | 72 +++++++++-- 14 files changed, 260 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 612fd8b..70459a2 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -88,6 +88,10 @@ public class ConfigurationKeys { /** * Job scheduler configuration properties. */ + // Job retriggering + public static final String JOB_RETRIGGERING_ENABLED = "job.retriggering.enabled"; + public static final String DEFAULT_JOB_RETRIGGERING_ENABLED = "true"; + // Job executor thread pool size public static final String JOB_EXECUTOR_THREAD_POOL_SIZE_KEY = "jobexecutor.threadpool.size"; public static final int DEFAULT_JOB_EXECUTOR_THREAD_POOL_SIZE = 5; @@ -473,6 +477,8 @@ public class ConfigurationKeys { public static final String SOURCE_MAX_NUMBER_OF_PARTITIONS = "source.max.number.of.partitions"; public static final String SOURCE_SKIP_FIRST_RECORD = "source.skip.first.record"; public static final String SOURCE_COLUMN_NAME_CASE = "source.column.name.case"; + public static final String SOURCE_EARLY_STOP_ENABLED = "source.earlyStop.enabled"; + public static final boolean DEFAULT_SOURCE_EARLY_STOP_ENABLED = false; /** * Configuration properties used by the QueryBasedExtractor. @@ -583,7 +589,6 @@ public class ConfigurationKeys { public static final String DEFAULT_SOURCE_QUERYBASED_IS_METADATA_COLUMN_CHECK_ENABLED = "true"; public static final String DEFAULT_COLUMN_NAME_CASE = "NOCHANGE"; public static final int DEFAULT_SOURCE_QUERYBASED_JDBC_RESULTSET_FETCH_SIZE = 1000; - public static final String FILEBASED_REPORT_STATUS_ON_COUNT = "filebased.report.status.on.count"; public static final int DEFAULT_FILEBASED_REPORT_STATUS_ON_COUNT = 10000; public static final String DEFAULT_SOURCE_TIMEZONE = PST_TIMEZONE_NAME; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-api/src/main/java/org/apache/gobblin/source/Source.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/Source.java b/gobblin-api/src/main/java/org/apache/gobblin/source/Source.java index 1edfd8d..37e73c8 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/source/Source.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/source/Source.java @@ -94,4 +94,13 @@ public interface Source<S, D> { * @param state see {@link SourceState} */ public abstract void shutdown(SourceState state); + + /** + * Instead of handling all {@link WorkUnit}s in one run, some {@link Source} may choose to stop early in order to handle the + * proper workload, which can cause multiple runs after the initial run. + * @return If the same job has early stopped + */ + public default boolean isEarlyStopped() { + return false; + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java index b8fd2f4..bc9f88f 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java @@ -17,15 +17,11 @@ package org.apache.gobblin.cluster; -import java.util.List; import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import lombok.extern.slf4j.Slf4j; -import org.apache.hadoop.fs.Path; -import org.apache.helix.HelixManager; import org.quartz.InterruptableJob; import org.quartz.Job; @@ -34,8 +30,6 @@ import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.apache.gobblin.annotation.Alpha; -import org.apache.gobblin.metrics.Tag; -import org.apache.gobblin.runtime.JobLauncher; import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.gobblin.scheduler.BaseGobblinJob; import org.apache.gobblin.scheduler.JobScheduler; @@ -56,24 +50,18 @@ public class GobblinHelixJob extends BaseGobblinJob implements InterruptableJob @Override public void executeImpl(JobExecutionContext context) throws JobExecutionException { JobDataMap dataMap = context.getJobDetail().getJobDataMap(); - ConcurrentHashMap runningMap = (ConcurrentHashMap)dataMap.get(GobblinHelixJobScheduler.JOB_RUNNING_MAP); final JobScheduler jobScheduler = (JobScheduler) dataMap.get(JobScheduler.JOB_SCHEDULER_KEY); // the properties may get mutated during job execution and the scheduler reuses it for the next round of scheduling, // so clone it final Properties jobProps = (Properties)((Properties) dataMap.get(JobScheduler.PROPERTIES_KEY)).clone(); final JobListener jobListener = (JobListener) dataMap.get(JobScheduler.JOB_LISTENER_KEY); - HelixManager helixManager = (HelixManager) dataMap.get(GobblinHelixJobScheduler.HELIX_MANAGER_KEY); - Path appWorkDir = (Path) dataMap.get(GobblinHelixJobScheduler.APPLICATION_WORK_DIR_KEY); - @SuppressWarnings("unchecked") - List<? extends Tag<?>> eventMetadata = (List<? extends Tag<?>>) dataMap.get(GobblinHelixJobScheduler.METADATA_TAGS); try { - final JobLauncher jobLauncher = new GobblinHelixJobLauncher(jobProps, helixManager, appWorkDir, eventMetadata, runningMap); if (Boolean.valueOf(jobProps.getProperty(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD, Boolean.toString(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT)))) { - jobScheduler.runJob(jobProps, jobListener, jobLauncher); + jobScheduler.runJob(jobProps, jobListener); } else { - cancellable = jobScheduler.scheduleJobImmediately(jobProps, jobListener, jobLauncher); + cancellable = jobScheduler.scheduleJobImmediately(jobProps, jobListener); } } catch (Throwable t) { throw new JobExecutionException(t); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java index 48b12f2..ef12162 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java @@ -19,23 +19,25 @@ package org.apache.gobblin.cluster; import java.net.URI; import java.net.URISyntaxException; -import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Properties; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.PropertiesUtils; import org.apache.hadoop.fs.Path; import org.apache.helix.HelixManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Optional; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; @@ -47,8 +49,6 @@ import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent; import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; -import org.apache.gobblin.metrics.ContextAwareMetric; -import org.apache.gobblin.metrics.ContextAwareGauge; import org.apache.gobblin.metrics.ContextAwareTimer; import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.MetricContext; @@ -66,6 +66,7 @@ import org.apache.gobblin.scheduler.SchedulerService; import javax.annotation.Nonnull; +import lombok.Getter; /** @@ -79,11 +80,6 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe private static final Logger LOGGER = LoggerFactory.getLogger(GobblinHelixJobScheduler.class); - static final String HELIX_MANAGER_KEY = "helixManager"; - static final String APPLICATION_WORK_DIR_KEY = "applicationWorkDir"; - static final String METADATA_TAGS = "metadataTags"; - static final String JOB_RUNNING_MAP = "jobRunningMap"; - private final Properties properties; private final HelixManager helixManager; private final EventBus eventBus; @@ -262,13 +258,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe @Override public void scheduleJob(Properties jobProps, JobListener jobListener) throws JobException { - Map<String, Object> additionalJobDataMap = Maps.newHashMap(); - additionalJobDataMap.put(HELIX_MANAGER_KEY, this.helixManager); - additionalJobDataMap.put(APPLICATION_WORK_DIR_KEY, this.appWorkDir); - additionalJobDataMap.put(METADATA_TAGS, this.metadataTags); - additionalJobDataMap.put(JOB_RUNNING_MAP, this.jobRunningMap); try { - scheduleJob(jobProps, jobListener, additionalJobDataMap, GobblinHelixJob.class); + scheduleJob(jobProps, jobListener, Maps.newHashMap(), GobblinHelixJob.class); } catch (Exception e) { throw new JobException("Failed to schedule job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); } @@ -280,19 +271,101 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe @Override public void runJob(Properties jobProps, JobListener jobListener) throws JobException { - try { - JobLauncher jobLauncher = buildGobblinHelixJobLauncher(jobProps); - runJob(jobProps, jobListener, jobLauncher); - } catch (Exception e) { - throw new JobException("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); - } + new RetriggeringJobCallable(jobProps, jobListener).call(); } - private GobblinHelixJobLauncher buildGobblinHelixJobLauncher(Properties jobProps) + @Override + public GobblinHelixJobLauncher buildJobLauncher(Properties jobProps) throws Exception { return new GobblinHelixJobLauncher(jobProps, this.helixManager, this.appWorkDir, this.metadataTags, this.jobRunningMap); } + private class RetriggeringJobCallable implements Callable { + Properties jobProps; + JobListener jobListener; + + public RetriggeringJobCallable(Properties jobProps, JobListener jobListener) { + this.jobProps = jobProps; + this.jobListener = jobListener; + } + + private boolean isRetriggeringEnabled() { + return PropertiesUtils.getPropAsBoolean(jobProps, ConfigurationKeys.JOB_RETRIGGERING_ENABLED, ConfigurationKeys.DEFAULT_JOB_RETRIGGERING_ENABLED); + } + + @Getter + JobLauncher currentJobLauncher = null; + + @Override + public Void call() throws JobException { + try { + while (true) { + currentJobLauncher = buildJobLauncher(jobProps); + boolean isEarlyStopped = runJob(jobProps, jobListener, currentJobLauncher); + boolean isRetriggerEnabled = this.isRetriggeringEnabled(); + if (isEarlyStopped && isRetriggerEnabled) { + LOGGER.info("Job {} will be re-triggered.", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)); + } else { + break; + } + currentJobLauncher = null; + } + } catch (Exception e) { + LOGGER.error("Failed to run job {}", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); + throw new JobException("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); + } + + return null; + } + } + + public Future<?> scheduleJobImmediately(Properties jobProps, JobListener jobListener) { + RetriggeringJobCallable retriggeringJob = new RetriggeringJobCallable(jobProps, jobListener); + final Future<?> future = this.jobExecutor.submit(retriggeringJob); + return new Future() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (!GobblinHelixJobScheduler.this.isCancelRequested()) { + return false; + } + boolean result = true; + try { + JobLauncher jobLauncher = retriggeringJob.getCurrentJobLauncher(); + if (jobLauncher != null) { + jobLauncher.cancelJob(jobListener); + } + } catch (JobException e) { + LOGGER.error("Failed to cancel job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); + result = false; + } + if (mayInterruptIfRunning) { + result &= future.cancel(true); + } + return result; + } + + @Override + public boolean isCancelled() { + return future.isCancelled(); + } + + @Override + public boolean isDone() { + return future.isDone(); + } + + @Override + public Object get() throws InterruptedException, ExecutionException { + return future.get(); + } + + @Override + public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return future.get(timeout, unit); + } + }; + } + @Subscribe public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) { LOGGER.info("Received new job configuration of job " + newJobArrival.getJobName()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/AbstractSource.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/AbstractSource.java b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/AbstractSource.java index db51597..d7fd05e 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/AbstractSource.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/AbstractSource.java @@ -139,4 +139,9 @@ public abstract class AbstractSource<S, D> implements Source<S, D> { public Extract createExtract(TableType type, String namespace, String table) { return this.extractFactory.getUniqueExtract(type, namespace, table); } + + @Override + public boolean isEarlyStopped() { + return false; + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/ExtractType.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/ExtractType.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/ExtractType.java index a8ae7bf..49cfae9 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/ExtractType.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/ExtractType.java @@ -17,6 +17,12 @@ package org.apache.gobblin.source.extractor.extract; +/** + * Different extract types + */ public enum ExtractType { - SNAPSHOT, APPEND_DAILY, APPEND_HOURLY, APPEND_BATCH + SNAPSHOT, // Used iff user wants highwatermark to be set to latest. + APPEND_DAILY, // Used iff user wants highwatermark to be set to a fixed point, like CURRENTDATE - <backoff days>. + APPEND_HOURLY, // Used iff user wants highwatermark to be set to a fixed point, like CURRENTHOUR - <backoff hours>. + APPEND_BATCH // <Please document this> } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java index 29c98d9..5d5330d 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java @@ -198,7 +198,7 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> { combinedState.setProp(ConfigurationKeys.SOURCE_QUERYBASED_END_VALUE, previousWatermark); } - workUnits.addAll(generateWorkUnits(sourceEntity, state, previousWatermark)); + workUnits.addAll(generateWorkUnits(sourceEntity, combinedState, previousWatermark)); } log.info("Total number of workunits for the current run: " + workUnits.size()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java index 90da5b5..052d6b4 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java @@ -53,6 +53,7 @@ public class Partitioner { public static final String WATERMARKTIMEFORMAT = "yyyyMMddHHmmss"; public static final String HAS_USER_SPECIFIED_PARTITIONS = "partitioner.hasUserSpecifiedPartitions"; public static final String USER_SPECIFIED_PARTITIONS = "partitioner.userSpecifiedPartitions"; + public static final String IS_EARLY_STOPPED = "partitioner.isEarlyStopped"; public static final Comparator<Partition> ascendingComparator = new Comparator<Partition>() { @Override @@ -109,7 +110,9 @@ public class Partitioner { * Get partitions with low and high water marks * * @param previousWatermark previous water mark from metadata - * @return map of partition intervals + * @return map of partition intervals. + * map's key is interval begin time (in format {@link Partitioner#WATERMARKTIMEFORMAT}) + * map's value is interval end time (in format {@link Partitioner#WATERMARKTIMEFORMAT}) */ @Deprecated public HashMap<Long, Long> getPartitions(long previousWatermark) { @@ -196,9 +199,12 @@ public class Partitioner { * Generate the partitions based on the lists specified by the user in job config */ private List<Partition> createUserSpecifiedPartitions() { + List<Partition> partitions = new ArrayList<>(); List<String> watermarkPoints = state.getPropAsList(USER_SPECIFIED_PARTITIONS); + boolean isEarlyStopped = state.getPropAsBoolean(IS_EARLY_STOPPED); + if (watermarkPoints == null || watermarkPoints.size() == 0 ) { LOG.info("There should be some partition points"); long defaultWatermark = ConfigurationKeys.DEFAULT_WATERMARK_VALUE; @@ -234,7 +240,9 @@ public class Partitioner { highWatermark = adjustWatermark(watermarkPoints.get(i), watermarkType); ExtractType extractType = ExtractType.valueOf(this.state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_EXTRACT_TYPE).toUpperCase()); - if (isFullDump() || isSnapshot(extractType)) { + + // If it is early stop, we should not remove upper bounds + if ((isFullDump() || isSnapshot(extractType)) && !isEarlyStopped) { // The upper bounds can be removed for last work unit partitions.add(new Partition(lowWatermark, highWatermark, true, false)); } else { @@ -242,6 +250,7 @@ public class Partitioner { partitions.add(new Partition(lowWatermark, highWatermark, true, true)); } + return partitions; } @@ -291,18 +300,21 @@ public class Partitioner { } /** - * Get low water mark + * Get low water mark: + * (1) Use {@link ConfigurationKeys#SOURCE_QUERYBASED_START_VALUE} iff it is a full dump (or watermark override is enabled) + * (2) Otherwise use previous watermark (fallback to {@link ConfigurationKeys#SOURCE_QUERYBASED_START_VALUE} iff previous watermark is unavailable) * * @param extractType Extract type * @param watermarkType Watermark type * @param previousWatermark Previous water mark * @param deltaForNextWatermark delta number for next water mark - * @return low water mark + * @return low water mark in {@link Partitioner#WATERMARKTIMEFORMAT} */ @VisibleForTesting protected long getLowWatermark(ExtractType extractType, WatermarkType watermarkType, long previousWatermark, int deltaForNextWatermark) { long lowWatermark = ConfigurationKeys.DEFAULT_WATERMARK_VALUE; + if (this.isFullDump() || this.isWatermarkOverride()) { String timeZone = this.state.getProp(ConfigurationKeys.SOURCE_TIMEZONE, ConfigurationKeys.DEFAULT_SOURCE_TIMEZONE); @@ -331,7 +343,7 @@ public class Partitioner { * @param watermarkType Watermark type * @param previousWatermark Previous water mark * @param deltaForNextWatermark delta number for next water mark - * @return snapshot low water mark + * @return Previous watermark (fallback to {@link ConfigurationKeys#SOURCE_QUERYBASED_START_VALUE} iff previous watermark is unavailable) */ private long getSnapshotLowWatermark(WatermarkType watermarkType, long previousWatermark, int deltaForNextWatermark) { LOG.debug("Getting snapshot low water mark"); @@ -362,7 +374,7 @@ public class Partitioner { * @param watermarkType Watermark type * @param previousWatermark Previous water mark * @param deltaForNextWatermark delta number for next water mark - * @return append low water mark + * @return Previous watermark (fallback to {@link ConfigurationKeys#SOURCE_QUERYBASED_START_VALUE} iff previous watermark is unavailable) */ private long getAppendLowWatermark(WatermarkType watermarkType, long previousWatermark, int deltaForNextWatermark) { LOG.debug("Getting append low water mark"); @@ -384,7 +396,7 @@ public class Partitioner { * * @param extractType Extract type * @param watermarkType Watermark type - * @return high water mark + * @return high water mark in {@link Partitioner#WATERMARKTIMEFORMAT} */ @VisibleForTesting protected long getHighWatermark(ExtractType extractType, WatermarkType watermarkType) { @@ -598,6 +610,8 @@ public class Partitioner { } /** + * If full dump is true, the low watermark will be based on {@link ConfigurationKeys#SOURCE_QUERYBASED_START_VALUE} + * Otherwise it will base on the previous watermark. Please refer to {@link Partitioner#getLowWatermark(ExtractType, WatermarkType, long, int)} * @return full dump or not */ public boolean isFullDump() { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/Watermark.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/Watermark.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/Watermark.java index e83fc65..53827b4 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/Watermark.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/Watermark.java @@ -26,8 +26,8 @@ public interface Watermark { * Condition statement with the water mark value using the operator. Example (last_updated_ts >= 2013-01-01 00:00:00 * * @param extractor - * @param water mark value - * @param relational operator between water mark column and value + * @param watermarkValue mark value + * @param operator operator between water mark column and value * @return condition statement */ public String getWatermarkCondition(QueryBasedExtractor<?, ?> extractor, long watermarkValue, String operator); @@ -35,10 +35,10 @@ public interface Watermark { /** * Get partitions for the given range * - * @param low water mark value - * @param high water mark value - * @param partition interval(in hours or days) - * @param maximum number of partitions + * @param lowWatermarkValue water mark value + * @param highWatermarkValue water mark value + * @param partitionInterval interval(in hours or days) + * @param maxIntervals number of partitions * @return partitions */ public HashMap<Long, Long> getIntervals(long lowWatermarkValue, long highWatermarkValue, long partitionInterval, http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java index da770af..0c50c32 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java @@ -887,6 +887,10 @@ public abstract class AbstractJobLauncher implements JobLauncher { } } + public boolean isEarlyStopped() { + return this.jobContext.getSource().isEarlyStopped(); + } + /** * Staging data cannot be cleaned if exactly once semantics is used, and the job has unfinished * commit sequences. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncher.java index 75c558f..52284b7 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncher.java @@ -67,4 +67,8 @@ public interface JobLauncher extends Closeable { */ public void cancelJob(@Nullable JobListener jobListener) throws JobException; + + public default boolean isEarlyStopped() { + return false; + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java index 9471444..4558325 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java @@ -116,4 +116,9 @@ public class SourceDecorator<S, D> implements WorkUnitStreamSource<S, D>, Decora public Object getDecoratedObject() { return this.source; } + + @Override + public boolean isEarlyStopped() { + return this.source.isEarlyStopped(); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java index 7ef6d85..f2e254d 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java @@ -24,14 +24,13 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.configuration.ConfigurationException; import org.apache.hadoop.fs.Path; @@ -63,6 +62,7 @@ import com.google.common.collect.Sets; import com.google.common.io.Closer; import com.google.common.util.concurrent.AbstractIdleService; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; @@ -135,6 +135,7 @@ public class JobScheduler extends AbstractIdleService { private final Closer closer = Closer.create(); + @Getter private volatile boolean cancelRequested = false; public JobScheduler(Properties properties, SchedulerService scheduler) @@ -254,17 +255,20 @@ public class JobScheduler extends AbstractIdleService { * with scheduling the job */ public Future<?> scheduleJobImmediately(Properties jobProps, JobListener jobListener, JobLauncher jobLauncher) { - Runnable runnable = new Runnable() { + Callable<Void> callable = new Callable<Void>() { @Override - public void run() { + public Void call() throws JobException { try { runJob(jobProps, jobListener, jobLauncher); } catch (JobException je) { LOG.error("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je); + throw je; } + return null; } }; - final Future<?> future = this.jobExecutor.submit(runnable); + + final Future<?> future = this.jobExecutor.submit(callable); return new Future() { @Override public boolean cancel(boolean mayInterruptIfRunning) { @@ -306,6 +310,14 @@ public class JobScheduler extends AbstractIdleService { }; } + public Future<?> scheduleJobImmediately(Properties jobProps, JobListener jobListener) throws JobException { + try { + return scheduleJobImmediately(jobProps, jobListener, buildJobLauncher(jobProps)); + } catch (Exception e) { + throw new JobException("Job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + " cannot be immediately scheduled.", e); + } + } + /** * Submit a runnable to the {@link ExecutorService} of this {@link JobScheduler}. * @param runnable the runnable to submit to the job executor @@ -419,12 +431,16 @@ public class JobScheduler extends AbstractIdleService { public void runJob(Properties jobProps, JobListener jobListener) throws JobException { try { - runJob(jobProps, jobListener, JobLauncherFactory.newJobLauncher(this.properties, jobProps)); + runJob(jobProps, jobListener, buildJobLauncher(jobProps)); } catch (Exception e) { throw new JobException("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); } } + public JobLauncher buildJobLauncher(Properties jobProps) throws Exception { + return JobLauncherFactory.newJobLauncher(this.properties, jobProps); + } + /** * Run a job. * @@ -441,9 +457,10 @@ public class JobScheduler extends AbstractIdleService { * @param jobProps Job configuration properties * @param jobListener {@link JobListener} used for callback, can be <em>null</em> if no callback is needed. * @param jobLauncher a {@link JobLauncher} object used to launch the job to run + * @return If current job needs retriggering * @throws JobException when there is anything wrong with running the job */ - public void runJob(Properties jobProps, JobListener jobListener, JobLauncher jobLauncher) + public boolean runJob(Properties jobProps, JobListener jobListener, JobLauncher jobLauncher) throws JobException { Preconditions.checkArgument(jobProps.containsKey(ConfigurationKeys.JOB_NAME_KEY), "A job must have a job name specified by job.name"); @@ -453,16 +470,20 @@ public class JobScheduler extends AbstractIdleService { boolean disabled = Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.JOB_DISABLED_KEY, "false")); if (disabled) { LOG.info("Skipping disabled job " + jobName); - return; + return false; } // Launch the job try (Closer closer = Closer.create()) { closer.register(jobLauncher).launchJob(jobListener); boolean runOnce = Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.JOB_RUN_ONCE_KEY, "false")); - if (runOnce && this.scheduledJobs.containsKey(jobName)) { + boolean isEarlyStopped = jobLauncher.isEarlyStopped(); + if (!isEarlyStopped && runOnce && this.scheduledJobs.containsKey(jobName)) { this.scheduler.getScheduler().deleteJob(this.scheduledJobs.remove(jobName)); } + + return isEarlyStopped; + } catch (Throwable t) { throw new JobException("Failed to launch and run job " + jobName, t); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ---------------------------------------------------------------------- diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java index 872fceb..b16a261 100644 --- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java +++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java @@ -83,7 +83,6 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { public static final String USE_ALL_OBJECTS = "use.all.objects"; public static final boolean DEFAULT_USE_ALL_OBJECTS = false; - private static final String ENABLE_DYNAMIC_PARTITIONING = "salesforce.enableDynamicPartitioning"; private static final String ENABLE_DYNAMIC_PROBING = "salesforce.enableDynamicProbing"; private static final String DYNAMIC_PROBING_LIMIT = "salesforce.dynamicProbingLimit"; private static final int DEFAULT_DYNAMIC_PROBING_LIMIT = 1000; @@ -101,10 +100,15 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { private static final String PROBE_PARTITION_QUERY_TEMPLATE = "SELECT count(${column}) cnt FROM ${table} " + "WHERE ${column} ${greater} ${start} AND ${column} ${less} ${end}"; + private static final String ENABLE_DYNAMIC_PARTITIONING = "salesforce.enableDynamicPartitioning"; + private static final String EARLY_STOP_TOTAL_RECORDS_LIMIT = "salesforce.earlyStopTotalRecordsLimit"; + private static final long DEFAULT_EARLY_STOP_TOTAL_RECORDS_LIMIT = DEFAULT_MIN_TARGET_PARTITION_SIZE * 4; + private static final String SECONDS_FORMAT = "yyyy-MM-dd-HH:mm:ss"; private static final String ZERO_TIME_SUFFIX = "-00:00:00"; private static final Gson GSON = new Gson(); + private boolean isEarlyStopped = false; @VisibleForTesting SalesforceSource(LineageInfo lineageInfo) { @@ -122,6 +126,11 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { } @Override + public boolean isEarlyStopped() { + return isEarlyStopped; + } + + @Override protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity, WorkUnit workUnit) { DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_SALESFORCE, entity.getSourceEntityName()); @@ -147,17 +156,56 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { return super.generateWorkUnits(sourceEntity, state, previousWatermark); } - Partition partition = new Partitioner(state).getGlobalPartition(previousWatermark); + Partitioner partitioner = new Partitioner(state); + if (isEarlyStopEnabled(state) && partitioner.isFullDump()) { + throw new UnsupportedOperationException("Early stop mode cannot work with full dump mode."); + } + + Partition partition = partitioner.getGlobalPartition(previousWatermark); Histogram histogram = getHistogram(sourceEntity.getSourceEntityName(), watermarkColumn, state, partition); - String specifiedPartitions = generateSpecifiedPartitions(histogram, minTargetPartitionSize, maxPartitions, - partition.getLowWatermark(), partition.getHighWatermark()); + // we should look if the count is too big, cut off early if count exceeds the limit, or bucket size is too large + + Histogram histogramAdjust; + + // TODO: we should consider move this logic into getRefinedHistogram so that we can early terminate the search + if (isEarlyStopEnabled(state)) { + histogramAdjust = new Histogram(); + for (HistogramGroup group : histogram.getGroups()) { + histogramAdjust.add(group); + if (histogramAdjust.getTotalRecordCount() > state + .getPropAsLong(EARLY_STOP_TOTAL_RECORDS_LIMIT, DEFAULT_EARLY_STOP_TOTAL_RECORDS_LIMIT)) { + break; + } + } + } else { + histogramAdjust = histogram; + } + + long expectedHighWatermark = partition.getHighWatermark(); + if (histogramAdjust.getGroups().size() < histogram.getGroups().size()) { + HistogramGroup lastPlusOne = histogram.get(histogramAdjust.getGroups().size()); + long earlyStopHighWatermark = Long.parseLong(Utils.toDateTimeFormat(lastPlusOne.getKey(), SECONDS_FORMAT, Partitioner.WATERMARKTIMEFORMAT)); + log.info("Job {} will be stopped earlier. [LW : {}, early-stop HW : {}, expected HW : {}]", state.getProp(ConfigurationKeys.JOB_NAME_KEY), partition.getLowWatermark(), earlyStopHighWatermark, expectedHighWatermark); + this.isEarlyStopped = true; + expectedHighWatermark = earlyStopHighWatermark; + } else { + log.info("Job {} will be finished in a single run. [LW : {}, expected HW : {}]", state.getProp(ConfigurationKeys.JOB_NAME_KEY), partition.getLowWatermark(), expectedHighWatermark); + } + + String specifiedPartitions = generateSpecifiedPartitions(histogramAdjust, minTargetPartitionSize, maxPartitions, + partition.getLowWatermark(), expectedHighWatermark); state.setProp(Partitioner.HAS_USER_SPECIFIED_PARTITIONS, true); state.setProp(Partitioner.USER_SPECIFIED_PARTITIONS, specifiedPartitions); + state.setProp(Partitioner.IS_EARLY_STOPPED, isEarlyStopped); return super.generateWorkUnits(sourceEntity, state, previousWatermark); } + private boolean isEarlyStopEnabled (State state) { + return state.getPropAsBoolean(ConfigurationKeys.SOURCE_EARLY_STOP_ENABLED, ConfigurationKeys.DEFAULT_SOURCE_EARLY_STOP_ENABLED); + } + String generateSpecifiedPartitions(Histogram histogram, int minTargetPartitionSize, int maxPartitions, long lowWatermark, long expectedHighWatermark) { int interval = computeTargetPartitionSize(histogram, minTargetPartitionSize, maxPartitions); @@ -183,7 +231,12 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { partitionPoints.add(Utils.toDateTimeFormat(group.getKey(), SECONDS_FORMAT, Partitioner.WATERMARKTIMEFORMAT)); } - // Move the candidate to a new bucket if the attempted total is 2x of interval + /** + * Using greedy algorithm by keep adding group until it exceeds the interval size (x2) + * Proof: Assuming nth group violates 2 x interval size, then all groups from 0th to (n-1)th, plus nth group, + * will have total size larger or equal to interval x 2. Hence, we are saturating all intervals (with original size) + * without leaving any unused space in between. We could choose x3,x4... but it is not space efficient. + */ if (count != 0 && count + group.count >= 2 * interval) { // Summarize current group statistics.addValue(count); @@ -351,7 +404,7 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { // make a copy of the histogram list and add a dummy entry at the end to avoid special processing of the last group List<HistogramGroup> list = new ArrayList(histogram.getGroups()); - Date hwmDate = Utils.toDate(partition.getLowWatermark(), Partitioner.WATERMARKTIMEFORMAT); + Date hwmDate = Utils.toDate(partition.getHighWatermark(), Partitioner.WATERMARKTIMEFORMAT); list.add(new HistogramGroup(Utils.epochToDate(hwmDate.getTime(), SECONDS_FORMAT), 0)); for (int i = 0; i < list.size() - 1; i++) { @@ -447,7 +500,7 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { // exchange the first histogram group key with the global low watermark to ensure that the low watermark is captured // in the range of generated partitions - HistogramGroup firstGroup = histogram.getGroups().get(0); + HistogramGroup firstGroup = histogram.get(0); Date lwmDate = Utils.toDate(partition.getLowWatermark(), Partitioner.WATERMARKTIMEFORMAT); histogram.getGroups().set(0, new HistogramGroup(Utils.epochToDate(lwmDate.getTime(), SECONDS_FORMAT), firstGroup.getCount())); @@ -523,6 +576,10 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { totalRecordCount += histogram.totalRecordCount; } + HistogramGroup get(int idx) { + return this.groups.get(idx); + } + @Override public String toString() { return groups.toString(); @@ -555,7 +612,6 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { } catch (RestApiProcessingException e) { throw Throwables.propagate(e); } - } private static Set<SourceEntity> getSourceEntities(String response) {
