This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c7762fb39 [GOBBLIN-1912] Highwtm query record parsing fix (#3777)
c7762fb39 is described below
commit c7762fb398f437443f509db0e888eca84f5201ec
Author: Gautam Kumar <[email protected]>
AuthorDate: Fri Sep 15 22:29:28 2023 +0530
[GOBBLIN-1912] Highwtm query record parsing fix (#3777)
* Updating response parsing for Salesforce high watermark fetch
- We updated the Salesforce high watermark query to use MAX aggregate
function instead of order by with limit 1.
- The response for the aggregate query is a bit different from the previous
version.
- Updating the parsing logic accordingly as part of this PR.
* Fixed watermark value format in test
* Rearranged imports based on codestyle-intellij-gobblin
---
.../gobblin/salesforce/SalesforceExtractor.java | 31 +++++++-----
.../salesforce/SalesforceExtractorTest.java | 55 +++++++++++++++++-----
2 files changed, 62 insertions(+), 24 deletions(-)
diff --git
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index 506c5141a..50935ed60 100644
---
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.salesforce;
-import com.google.common.collect.Iterators;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.net.URI;
@@ -30,9 +29,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
-
import java.util.stream.Collectors;
import java.util.stream.Stream;
+
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
@@ -42,6 +41,7 @@ import org.apache.http.message.BasicNameValuePair;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
@@ -60,6 +60,9 @@ import com.sforce.async.QueryResultList;
import com.sforce.soap.partner.PartnerConnection;
import com.sforce.ws.ConnectorConfig;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.password.PasswordManager;
@@ -71,7 +74,6 @@ import
org.apache.gobblin.source.extractor.exception.RestApiConnectionException;
import org.apache.gobblin.source.extractor.exception.SchemaException;
import org.apache.gobblin.source.extractor.extract.Command;
import org.apache.gobblin.source.extractor.extract.CommandOutput;
-import org.apache.gobblin.source.jdbc.SqlQueryUtils;
import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand;
import
org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand.RestApiCommandType;
import org.apache.gobblin.source.extractor.extract.restapi.RestApiConnector;
@@ -80,12 +82,12 @@ import org.apache.gobblin.source.extractor.schema.Schema;
import org.apache.gobblin.source.extractor.utils.Utils;
import org.apache.gobblin.source.extractor.watermark.Predicate;
import org.apache.gobblin.source.extractor.watermark.WatermarkType;
+import org.apache.gobblin.source.jdbc.SqlQueryUtils;
import org.apache.gobblin.source.workunit.WorkUnit;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-
-import static org.apache.gobblin.salesforce.SalesforceConfigurationKeys.*;
+import static
org.apache.gobblin.salesforce.SalesforceConfigurationKeys.PK_CHUNKING_BATCH_RESULT_ID_PAIRS;
+import static
org.apache.gobblin.salesforce.SalesforceConfigurationKeys.PK_CHUNKING_JOB_ID;
+import static
org.apache.gobblin.salesforce.SalesforceConfigurationKeys.SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED;
/**
* An implementation of salesforce extractor for extracting data from SFDC
@@ -98,6 +100,13 @@ public class SalesforceExtractor extends RestApiExtractor {
private static final String SALESFORCE_DATE_FORMAT = "yyyy-MM-dd";
private static final String SALESFORCE_HOUR_FORMAT = "HH";
private static final String SALESFORCE_SOAP_SERVICE = "/services/Soap/u";
+
+ /*
+ This is the key in the response of the aggregate MAX query, and the
corresponding
+ value is the column value on which MAX is applied.
+ */
+ private static final String MAX_QUERY_RESPONSE_KEY = "expr0";
+
private static final Gson GSON = new Gson();
private static final int MAX_RETRY_INTERVAL_SECS = 600;
@@ -286,12 +295,12 @@ public class SalesforceExtractor extends RestApiExtractor
{
JsonArray jsonArray = jsonObject.getAsJsonArray("records");
if (jsonArray == null || jsonArray.size() == 0) {
- return -1;
+ return ConfigurationKeys.DEFAULT_WATERMARK_VALUE;
}
- JsonElement hwmJsonElement =
jsonArray.get(0).getAsJsonObject().get(watermarkColumn);
- if (hwmJsonElement == null) {
- return -1;
+ JsonElement hwmJsonElement =
jsonArray.get(0).getAsJsonObject().get(MAX_QUERY_RESPONSE_KEY);
+ if (hwmJsonElement == null || hwmJsonElement.isJsonNull()) {
+ return ConfigurationKeys.DEFAULT_WATERMARK_VALUE;
}
String value = hwmJsonElement.getAsString();
diff --git
a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceExtractorTest.java
b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceExtractorTest.java
index c1b0d69c3..521db5fe2 100644
---
a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceExtractorTest.java
+++
b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceExtractorTest.java
@@ -16,9 +16,16 @@
*/
package org.apache.gobblin.salesforce;
-import com.google.common.collect.ImmutableList;
import java.util.Collections;
import java.util.List;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
@@ -33,10 +40,6 @@ import
org.apache.gobblin.source.extractor.watermark.Predicate;
import org.apache.gobblin.source.extractor.watermark.TimestampWatermark;
import org.apache.gobblin.source.extractor.watermark.WatermarkType;
import org.apache.gobblin.source.workunit.WorkUnit;
-import org.testng.Assert;
-import org.testng.annotations.BeforeTest;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
public class SalesforceExtractorTest {
@@ -114,16 +117,42 @@ public class SalesforceExtractorTest {
return new Object[][] {
{
"{}",
- -1L
+ ConfigurationKeys.DEFAULT_WATERMARK_VALUE
},
{
- "{'records': [{}]}",
- -1L
+ "{\"records\": [{}]}",
+ ConfigurationKeys.DEFAULT_WATERMARK_VALUE
},
{
- String.format("{'records': [{'%s': 20230914100900}]}",
DEFAULT_WATERMARK_COLUMN),
- 20230914100900L
- }
+ "{"
+ + " \"totalSize\": 1,"
+ + " \"done\": true,"
+ + " \"records\": ["
+ + " {"
+ + " \"attributes\": {"
+ + " \"type\": \"AggregateResult\""
+ + " },"
+ + " \"expr0\": null"
+ + " }"
+ + " ]"
+ + "}",
+ ConfigurationKeys.DEFAULT_WATERMARK_VALUE
+ },
+ {
+ "{"
+ + " \"totalSize\": 1,"
+ + " \"done\": true,"
+ + " \"records\": ["
+ + " {"
+ + " \"attributes\": {"
+ + " \"type\": \"AggregateResult\""
+ + " },"
+ + " \"expr0\": \"2023-09-15T05:21:41.000Z\""
+ + " }"
+ + " ]"
+ + "}",
+ 20230915052141L
+ },
};
}
@@ -133,7 +162,7 @@ public class SalesforceExtractorTest {
RestApiCommand command = new RestApiCommand();
response.put(command, commandOutputAsStr);
long actualHighWtm =
- _classUnderTest.getHighWatermark(response, DEFAULT_WATERMARK_COLUMN,
DEFAULT_WATERMARK_VALUE_FORMAT);
+ _classUnderTest.getHighWatermark(response, DEFAULT_WATERMARK_COLUMN,
SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT);
Assert.assertEquals(actualHighWtm, expectedHwm);
}
-}
\ No newline at end of file
+}