This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c7762fb39 [GOBBLIN-1912] Highwtm query record parsing fix (#3777)
c7762fb39 is described below

commit c7762fb398f437443f509db0e888eca84f5201ec
Author: Gautam Kumar <[email protected]>
AuthorDate: Fri Sep 15 22:29:28 2023 +0530

    [GOBBLIN-1912] Highwtm query record parsing fix (#3777)
    
    * Updating response parsing for Salesforce high watermark fetch
    
    - We updated the Salesforce high watermark query to use MAX aggregate 
function instead of order by with limit 1.
    - The response for the aggregate query is a bit different from the previous 
version.
    - Updating the parsing logic accordingly as part of this PR.
    
    * Fixed watermark value format in test
    
    * Rearranged imports based on codestyle-intellij-gobblin
---
 .../gobblin/salesforce/SalesforceExtractor.java    | 31 +++++++-----
 .../salesforce/SalesforceExtractorTest.java        | 55 +++++++++++++++++-----
 2 files changed, 62 insertions(+), 24 deletions(-)

diff --git 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index 506c5141a..50935ed60 100644
--- 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++ 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.salesforce;
 
-import com.google.common.collect.Iterators;
 import java.io.ByteArrayInputStream;
 import java.io.FileNotFoundException;
 import java.net.URI;
@@ -30,9 +29,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
-
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.http.HttpEntity;
 import org.apache.http.NameValuePair;
@@ -42,6 +41,7 @@ import org.apache.http.message.BasicNameValuePair;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import com.google.gson.JsonArray;
@@ -60,6 +60,9 @@ import com.sforce.async.QueryResultList;
 import com.sforce.soap.partner.PartnerConnection;
 import com.sforce.ws.ConnectorConfig;
 
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.password.PasswordManager;
@@ -71,7 +74,6 @@ import 
org.apache.gobblin.source.extractor.exception.RestApiConnectionException;
 import org.apache.gobblin.source.extractor.exception.SchemaException;
 import org.apache.gobblin.source.extractor.extract.Command;
 import org.apache.gobblin.source.extractor.extract.CommandOutput;
-import org.apache.gobblin.source.jdbc.SqlQueryUtils;
 import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand;
 import 
org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand.RestApiCommandType;
 import org.apache.gobblin.source.extractor.extract.restapi.RestApiConnector;
@@ -80,12 +82,12 @@ import org.apache.gobblin.source.extractor.schema.Schema;
 import org.apache.gobblin.source.extractor.utils.Utils;
 import org.apache.gobblin.source.extractor.watermark.Predicate;
 import org.apache.gobblin.source.extractor.watermark.WatermarkType;
+import org.apache.gobblin.source.jdbc.SqlQueryUtils;
 import org.apache.gobblin.source.workunit.WorkUnit;
 
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-
-import static org.apache.gobblin.salesforce.SalesforceConfigurationKeys.*;
+import static 
org.apache.gobblin.salesforce.SalesforceConfigurationKeys.PK_CHUNKING_BATCH_RESULT_ID_PAIRS;
+import static 
org.apache.gobblin.salesforce.SalesforceConfigurationKeys.PK_CHUNKING_JOB_ID;
+import static 
org.apache.gobblin.salesforce.SalesforceConfigurationKeys.SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED;
 
 /**
  * An implementation of salesforce extractor for extracting data from SFDC
@@ -98,6 +100,13 @@ public class SalesforceExtractor extends RestApiExtractor {
   private static final String SALESFORCE_DATE_FORMAT = "yyyy-MM-dd";
   private static final String SALESFORCE_HOUR_FORMAT = "HH";
   private static final String SALESFORCE_SOAP_SERVICE = "/services/Soap/u";
+
+  /*
+  This is the key in the response of the aggregate MAX query, and the 
corresponding
+  value is the column value on which MAX is applied.
+   */
+  private static final String MAX_QUERY_RESPONSE_KEY = "expr0";
+
   private static final Gson GSON = new Gson();
   private static final int MAX_RETRY_INTERVAL_SECS = 600;
 
@@ -286,12 +295,12 @@ public class SalesforceExtractor extends RestApiExtractor 
{
 
       JsonArray jsonArray = jsonObject.getAsJsonArray("records");
       if (jsonArray == null || jsonArray.size() == 0) {
-        return -1;
+        return ConfigurationKeys.DEFAULT_WATERMARK_VALUE;
       }
 
-      JsonElement hwmJsonElement = 
jsonArray.get(0).getAsJsonObject().get(watermarkColumn);
-      if (hwmJsonElement == null) {
-        return -1;
+      JsonElement hwmJsonElement = 
jsonArray.get(0).getAsJsonObject().get(MAX_QUERY_RESPONSE_KEY);
+      if (hwmJsonElement == null || hwmJsonElement.isJsonNull()) {
+        return ConfigurationKeys.DEFAULT_WATERMARK_VALUE;
       }
 
       String value = hwmJsonElement.getAsString();
diff --git 
a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceExtractorTest.java
 
b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceExtractorTest.java
index c1b0d69c3..521db5fe2 100644
--- 
a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceExtractorTest.java
+++ 
b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceExtractorTest.java
@@ -16,9 +16,16 @@
  */
 package org.apache.gobblin.salesforce;
 
-import com.google.common.collect.ImmutableList;
 import java.util.Collections;
 import java.util.List;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
@@ -33,10 +40,6 @@ import 
org.apache.gobblin.source.extractor.watermark.Predicate;
 import org.apache.gobblin.source.extractor.watermark.TimestampWatermark;
 import org.apache.gobblin.source.extractor.watermark.WatermarkType;
 import org.apache.gobblin.source.workunit.WorkUnit;
-import org.testng.Assert;
-import org.testng.annotations.BeforeTest;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
 
 
 public class SalesforceExtractorTest {
@@ -114,16 +117,42 @@ public class SalesforceExtractorTest {
     return new Object[][] {
         {
             "{}",
-            -1L
+            ConfigurationKeys.DEFAULT_WATERMARK_VALUE
         },
         {
-            "{'records': [{}]}",
-            -1L
+            "{\"records\": [{}]}",
+            ConfigurationKeys.DEFAULT_WATERMARK_VALUE
         },
         {
-            String.format("{'records': [{'%s': 20230914100900}]}", 
DEFAULT_WATERMARK_COLUMN),
-            20230914100900L
-        }
+            "{"
+                + "    \"totalSize\": 1,"
+                + "    \"done\": true,"
+                + "    \"records\": ["
+                + "        {"
+                + "            \"attributes\": {"
+                + "                \"type\": \"AggregateResult\""
+                + "            },"
+                + "            \"expr0\": null"
+                + "        }"
+                + "    ]"
+                + "}",
+            ConfigurationKeys.DEFAULT_WATERMARK_VALUE
+        },
+        {
+            "{"
+                + "    \"totalSize\": 1,"
+                + "    \"done\": true,"
+                + "    \"records\": ["
+                + "        {"
+                + "            \"attributes\": {"
+                + "                \"type\": \"AggregateResult\""
+                + "            },"
+                + "            \"expr0\": \"2023-09-15T05:21:41.000Z\""
+                + "        }"
+                + "    ]"
+                + "}",
+            20230915052141L
+        },
     };
   }
 
@@ -133,7 +162,7 @@ public class SalesforceExtractorTest {
     RestApiCommand command = new RestApiCommand();
     response.put(command, commandOutputAsStr);
     long actualHighWtm =
-        _classUnderTest.getHighWatermark(response, DEFAULT_WATERMARK_COLUMN, 
DEFAULT_WATERMARK_VALUE_FORMAT);
+        _classUnderTest.getHighWatermark(response, DEFAULT_WATERMARK_COLUMN, 
SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT);
     Assert.assertEquals(actualHighWtm, expectedHwm);
   }
-}
\ No newline at end of file
+}

Reply via email to