Repository: incubator-gobblin Updated Branches: refs/heads/master d1950135e -> 06bcc948c
[GOBBLIN-260] Fix Salesforce dynamic partitioning bugs Closes #2112 from jack-moseley/salesforce Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/06bcc948 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/06bcc948 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/06bcc948 Branch: refs/heads/master Commit: 06bcc948c0185477b9e3ed2e6119db67f75d63c2 Parents: d195013 Author: Jack Moseley <[email protected]> Authored: Wed Sep 27 09:20:34 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Wed Sep 27 09:20:34 2017 -0700 ---------------------------------------------------------------------- .../gobblin/salesforce/SalesforceExtractor.java | 2 +- .../gobblin/salesforce/SalesforceSource.java | 36 +++++++++++++++++--- 2 files changed, 32 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/06bcc948/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ---------------------------------------------------------------------- diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java index 4498d72..22a0850 100644 --- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java +++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java @@ -91,7 +91,7 @@ import lombok.extern.slf4j.Slf4j; public class SalesforceExtractor extends RestApiExtractor { private static final String SOQL_RESOURCE = "/queryAll"; - private static final String SALESFORCE_TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'.000Z'"; + public static final String SALESFORCE_TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'.000Z'"; private static final String SALESFORCE_DATE_FORMAT = "yyyy-MM-dd"; private static final String SALESFORCE_HOUR_FORMAT = "HH"; private static final String SALESFORCE_SOAP_AUTH_SERVICE = "/services/Soap/u"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/06bcc948/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ---------------------------------------------------------------------- diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java index 3b6b729..6089415 100644 --- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java +++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java @@ -79,7 +79,7 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { private static final String ENABLE_DYNAMIC_PARTITIONING = "salesforce.enableDynamicPartitioning"; private static final String DAY_PARTITION_QUERY_TEMPLATE = "SELECT count(${column}) cnt, DAY_ONLY(${column}) time FROM ${table} " - + "WHERE CALENDAR_YEAR(${column}) = ${year} GROUP BY DAY_ONLY(${column}) ORDER BY DAY_ONLY(${column})"; + + "WHERE ${column} ${greater} ${start} AND ${column} ${less} ${end} GROUP BY DAY_ONLY(${column}) ORDER BY DAY_ONLY(${column})"; private static final String DAY_FORMAT = "yyyy-MM-dd"; private static final Gson GSON = new Gson(); @@ -101,17 +101,18 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { .toUpperCase()); String watermarkColumn = state.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY); + int maxPartitions = state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS, + ConfigurationKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS); + // Only support time related watermark if (watermarkType == WatermarkType.SIMPLE || Strings.isNullOrEmpty(watermarkColumn) - || !state.getPropAsBoolean(ENABLE_DYNAMIC_PARTITIONING)) { + || !state.getPropAsBoolean(ENABLE_DYNAMIC_PARTITIONING) || maxPartitions <= 1) { return super.generateWorkUnits(sourceEntity, state, previousWatermark); } Partition partition = new Partitioner(state).getGlobalPartition(previousWatermark); Histogram histogram = getHistogram(sourceEntity.getSourceEntityName(), watermarkColumn, state, partition); - int maxPartitions = state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS, - ConfigurationKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS); String specifiedPartitions = generateSpecifiedPartitions(histogram, maxPartitions, partition.getHighWatermark()); state.setProp(Partitioner.HAS_USER_SPECIFIED_PARTITIONS, true); state.setProp(Partitioner.USER_SPECIFIED_PARTITIONS, specifiedPartitions); @@ -189,10 +190,12 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { Date startDate = Utils.toDate(partition.getLowWatermark(), Partitioner.WATERMARKTIMEFORMAT); calendar.setTime(startDate); int startYear = calendar.get(Calendar.YEAR); + String lowWatermarkDate = Utils.dateToString(startDate, SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT); Date endDate = Utils.toDate(partition.getHighWatermark(), Partitioner.WATERMARKTIMEFORMAT); calendar.setTime(endDate); int endYear = calendar.get(Calendar.YEAR); + String highWatermarkDate = Utils.dateToString(endDate, SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT); Map<String, String> values = new HashMap<>(); values.put("table", entity); @@ -209,7 +212,23 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { } for (int year = startYear; year <= endYear; year++) { - values.put("year", Integer.toString(year)); + + if (year == startYear) { + values.put("start", lowWatermarkDate); + values.put("greater", partition.isLowWatermarkInclusive() ? ">=" : ">"); + } else { + values.put("start", getDateString(year)); + values.put("greater", ">="); + } + + if (year == endYear) { + values.put("end", highWatermarkDate); + values.put("less", partition.isHighWatermarkInclusive() ? "<=" : "<"); + } else { + values.put("end", getDateString(year + 1)); + values.put("less", "<"); + } + String query = sub.replace(DAY_PARTITION_QUERY_TEMPLATE); log.info("Histogram query: " + query); @@ -226,6 +245,13 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { return histogram; } + private String getDateString(int year) { + Calendar calendar = new GregorianCalendar(); + calendar.clear(); + calendar.set(Calendar.YEAR, year); + return Utils.dateToString(calendar.getTime(), SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT); + } + private Histogram parseHistogram(CommandOutput<?, ?> response) throws DataRecordException { log.info("Parse histogram"); Histogram histogram = new Histogram();
