Repository: incubator-gobblin
Updated Branches:
  refs/heads/master d1950135e -> 06bcc948c


[GOBBLIN-260] Fix Salesforce dynamic partitioning bugs

Closes #2112 from jack-moseley/salesforce


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/06bcc948
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/06bcc948
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/06bcc948

Branch: refs/heads/master
Commit: 06bcc948c0185477b9e3ed2e6119db67f75d63c2
Parents: d195013
Author: Jack Moseley <[email protected]>
Authored: Wed Sep 27 09:20:34 2017 -0700
Committer: Hung Tran <[email protected]>
Committed: Wed Sep 27 09:20:34 2017 -0700

----------------------------------------------------------------------
 .../gobblin/salesforce/SalesforceExtractor.java |  2 +-
 .../gobblin/salesforce/SalesforceSource.java    | 36 +++++++++++++++++---
 2 files changed, 32 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/06bcc948/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index 4498d72..22a0850 100644
--- 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++ 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -91,7 +91,7 @@ import lombok.extern.slf4j.Slf4j;
 public class SalesforceExtractor extends RestApiExtractor {
 
   private static final String SOQL_RESOURCE = "/queryAll";
-  private static final String SALESFORCE_TIMESTAMP_FORMAT = 
"yyyy-MM-dd'T'HH:mm:ss'.000Z'";
+  public static final String SALESFORCE_TIMESTAMP_FORMAT = 
"yyyy-MM-dd'T'HH:mm:ss'.000Z'";
   private static final String SALESFORCE_DATE_FORMAT = "yyyy-MM-dd";
   private static final String SALESFORCE_HOUR_FORMAT = "HH";
   private static final String SALESFORCE_SOAP_AUTH_SERVICE = 
"/services/Soap/u";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/06bcc948/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
index 3b6b729..6089415 100644
--- 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
+++ 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
@@ -79,7 +79,7 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
 
   private static final String ENABLE_DYNAMIC_PARTITIONING = 
"salesforce.enableDynamicPartitioning";
   private static final String DAY_PARTITION_QUERY_TEMPLATE = "SELECT 
count(${column}) cnt, DAY_ONLY(${column}) time FROM ${table} "
-      + "WHERE CALENDAR_YEAR(${column}) = ${year} GROUP BY DAY_ONLY(${column}) 
ORDER BY DAY_ONLY(${column})";
+      + "WHERE ${column} ${greater} ${start} AND ${column} ${less} ${end} 
GROUP BY DAY_ONLY(${column}) ORDER BY DAY_ONLY(${column})";
   private static final String DAY_FORMAT = "yyyy-MM-dd";
 
   private static final Gson GSON = new Gson();
@@ -101,17 +101,18 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
             .toUpperCase());
     String watermarkColumn = 
state.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY);
 
+    int maxPartitions = 
state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS,
+        ConfigurationKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS);
+
     // Only support time related watermark
     if (watermarkType == WatermarkType.SIMPLE || 
Strings.isNullOrEmpty(watermarkColumn)
-        || !state.getPropAsBoolean(ENABLE_DYNAMIC_PARTITIONING)) {
+        || !state.getPropAsBoolean(ENABLE_DYNAMIC_PARTITIONING) || 
maxPartitions <= 1) {
       return super.generateWorkUnits(sourceEntity, state, previousWatermark);
     }
 
     Partition partition = new 
Partitioner(state).getGlobalPartition(previousWatermark);
     Histogram histogram = getHistogram(sourceEntity.getSourceEntityName(), 
watermarkColumn, state, partition);
 
-    int maxPartitions = 
state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS,
-        ConfigurationKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS);
     String specifiedPartitions = generateSpecifiedPartitions(histogram, 
maxPartitions, partition.getHighWatermark());
     state.setProp(Partitioner.HAS_USER_SPECIFIED_PARTITIONS, true);
     state.setProp(Partitioner.USER_SPECIFIED_PARTITIONS, specifiedPartitions);
@@ -189,10 +190,12 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
     Date startDate = Utils.toDate(partition.getLowWatermark(), 
Partitioner.WATERMARKTIMEFORMAT);
     calendar.setTime(startDate);
     int startYear = calendar.get(Calendar.YEAR);
+    String lowWatermarkDate = Utils.dateToString(startDate, 
SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT);
 
     Date endDate = Utils.toDate(partition.getHighWatermark(), 
Partitioner.WATERMARKTIMEFORMAT);
     calendar.setTime(endDate);
     int endYear = calendar.get(Calendar.YEAR);
+    String highWatermarkDate = Utils.dateToString(endDate, 
SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT);
 
     Map<String, String> values = new HashMap<>();
     values.put("table", entity);
@@ -209,7 +212,23 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
     }
 
     for (int year = startYear; year <= endYear; year++) {
-      values.put("year", Integer.toString(year));
+
+      if (year == startYear) {
+        values.put("start", lowWatermarkDate);
+        values.put("greater", partition.isLowWatermarkInclusive() ? ">=" : 
">");
+      } else {
+        values.put("start", getDateString(year));
+        values.put("greater", ">=");
+      }
+
+      if (year == endYear) {
+        values.put("end", highWatermarkDate);
+        values.put("less", partition.isHighWatermarkInclusive() ? "<=" : "<");
+      } else {
+        values.put("end", getDateString(year + 1));
+        values.put("less", "<");
+      }
+
       String query = sub.replace(DAY_PARTITION_QUERY_TEMPLATE);
       log.info("Histogram query: " + query);
 
@@ -226,6 +245,13 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
     return histogram;
   }
 
+  private String getDateString(int year) {
+    Calendar calendar = new GregorianCalendar();
+    calendar.clear();
+    calendar.set(Calendar.YEAR, year);
+    return Utils.dateToString(calendar.getTime(), 
SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT);
+  }
+
   private Histogram parseHistogram(CommandOutput<?, ?> response) throws 
DataRecordException {
     log.info("Parse histogram");
     Histogram histogram = new Histogram();

Reply via email to