Repository: incubator-gobblin Updated Branches: refs/heads/master 12bb1dcf9 -> c36f2c87b
[GOBBLIN-466] Reuse the same connector for salesforce dynamic partitioning Closes #2338 from yukuai518/sf2 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/c36f2c87 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/c36f2c87 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/c36f2c87 Branch: refs/heads/master Commit: c36f2c87b754032d0946df0e58514c6932e33a9c Parents: 12bb1dc Author: Kuai Yu <k...@linkedin.com> Authored: Fri Apr 13 17:52:56 2018 -0700 Committer: Hung Tran <hut...@linkedin.com> Committed: Fri Apr 13 17:52:56 2018 -0700 ---------------------------------------------------------------------- .../org/apache/gobblin/salesforce/SalesforceSource.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c36f2c87/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ---------------------------------------------------------------------- diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java index 3702c2e..aedd97b 100644 --- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java +++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java @@ -109,6 +109,7 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { private static final Gson GSON = new Gson(); private boolean isEarlyStopped = false; + protected SalesforceConnector salesforceConnector = null; public SalesforceSource() { this.lineageInfo = Optional.absent(); @@ -485,12 +486,19 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { return histogram; } + protected SalesforceConnector getConnector(State state) { + if (this.salesforceConnector == null) { + this.salesforceConnector = new SalesforceConnector(state); + } + return this.salesforceConnector; + } + /** * Generate the histogram */ private Histogram getHistogram(String entity, String watermarkColumn, SourceState state, Partition partition) { - SalesforceConnector connector = new SalesforceConnector(state); + SalesforceConnector connector = getConnector(state); try { if (!connector.connect()) { @@ -595,7 +603,7 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { return super.getSourceEntities(state); } - SalesforceConnector connector = new SalesforceConnector(state); + SalesforceConnector connector = getConnector(state); try { if (!connector.connect()) { throw new RuntimeException("Failed to connect.");