This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cfdd15d6 [GOBBLIN-1888] Optimizing high watermark metadata query for 
SFDC (#3750)
6cfdd15d6 is described below

commit 6cfdd15d660cfe748c9184e1324a7d3d18fd317f
Author: Gautam Kumar <[email protected]>
AuthorDate: Mon Aug 28 08:29:54 2023 +0530

    [GOBBLIN-1888] Optimizing high watermark metadata query for SFDC (#3750)
    
    Co-authored-by: Gautam Kumar <[email protected]>
---
 .../gobblin/salesforce/SalesforceExtractor.java    |  11 +--
 .../salesforce/SalesforceExtractorTest.java        | 109 +++++++++++++++++++++
 2 files changed, 112 insertions(+), 8 deletions(-)

diff --git 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index d902a91d7..648d1bb2c 100644
--- 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++ 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -237,9 +237,8 @@ public class SalesforceExtractor extends RestApiExtractor {
   public List<Command> getHighWatermarkMetadata(String schema, String entity, 
String watermarkColumn,
       List<Predicate> predicateList) throws HighWatermarkException {
     log.debug("Build url to retrieve high watermark");
-    String query = "SELECT " + watermarkColumn + " FROM " + entity;
-    String defaultPredicate = " " + watermarkColumn + " != null";
-    String defaultSortOrder = " ORDER BY " + watermarkColumn + " desc LIMIT 1";
+
+    String query = "SELECT MAX(" + watermarkColumn + ") FROM " + entity;
 
     String existingPredicate = "";
     if (this.updatedQuery != null) {
@@ -254,13 +253,9 @@ public class SalesforceExtractor extends RestApiExtractor {
     String limitString = getLimitFromInputQuery(query);
     query = query.replace(limitString, "");
 
-    Iterator<Predicate> i = predicateList.listIterator();
-    while (i.hasNext()) {
-      Predicate predicate = i.next();
+    for (Predicate predicate : predicateList) {
       query = SqlQueryUtils.addPredicate(query, predicate.getCondition());
     }
-    query = SqlQueryUtils.addPredicate(query, defaultPredicate);
-    query = query + defaultSortOrder;
     log.info("getHighWatermarkMetadata - QUERY: " + query);
 
     try {
diff --git 
a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceExtractorTest.java
 
b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceExtractorTest.java
new file mode 100644
index 000000000..d4182ef59
--- /dev/null
+++ 
b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceExtractorTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.salesforce;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.exception.HighWatermarkException;
+import org.apache.gobblin.source.extractor.exception.RestApiClientException;
+import org.apache.gobblin.source.extractor.extract.Command;
+import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand;
+import org.apache.gobblin.source.extractor.partition.Partition;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.extractor.watermark.TimestampWatermark;
+import org.apache.gobblin.source.extractor.watermark.WatermarkType;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.testng.Assert;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class SalesforceExtractorTest {
+
+  private static final String DEFAULT_SCHEMA = "test-schema";
+  private static final String DEFAULT_ENTITY = "test-entity";
+  private static final String DEFAULT_WATERMARK_COLUMN = 
"test-watermark-column";
+  private static final String GTE_OPERATOR = ">=";
+  private static final String LTE_OPERATOR = "<=";
+  private static final long LWM_VALUE_1 = 20131212121212L;
+  private static final long HWM_VALUE_1 = 20231212121212L;
+  private static final String DEFAULT_WATERMARK_VALUE_FORMAT = 
"yyyyMMddHHmmss";
+
+  private SalesforceExtractor _classUnderTest;
+
+  @BeforeTest
+  public void beforeTest() {
+    WorkUnit workUnit = WorkUnit.createEmpty();
+    workUnit.setProp(Partition.IS_LAST_PARTIITON, false);
+    workUnit.setProp(ConfigurationKeys.SOURCE_QUERYBASED_EXTRACT_TYPE, 
"SNAPSHOT");
+    WorkUnitState workUnitState = new WorkUnitState(workUnit, new State());
+    workUnitState.setId("test");
+    _classUnderTest = new SalesforceExtractor(workUnitState);
+  }
+
+  @DataProvider
+  private Object[][] provideGetHighWatermarkMetadataTestData() {
+
+    TimestampWatermark watermark =
+        new TimestampWatermark(DEFAULT_WATERMARK_COLUMN, 
DEFAULT_WATERMARK_VALUE_FORMAT);
+    String lwmCondition = watermark.getWatermarkCondition(_classUnderTest, 
LWM_VALUE_1, GTE_OPERATOR);
+    String hwmCondition = watermark.getWatermarkCondition(_classUnderTest, 
HWM_VALUE_1, LTE_OPERATOR);
+    Predicate lwmPredicate =
+        new Predicate(DEFAULT_WATERMARK_COLUMN, LWM_VALUE_1, lwmCondition,
+            _classUnderTest.getWatermarkSourceFormat(WatermarkType.TIMESTAMP), 
Predicate.PredicateType.LWM);
+    Predicate hwmPredicate =
+        new Predicate(DEFAULT_WATERMARK_COLUMN, HWM_VALUE_1, hwmCondition,
+            _classUnderTest.getWatermarkSourceFormat(WatermarkType.TIMESTAMP), 
Predicate.PredicateType.HWM);
+
+    return new Object[][] {
+        {
+            // With low and high watermark predicates
+            ImmutableList.of(lwmPredicate, hwmPredicate),
+            String.format("SELECT MAX(%s) FROM %s where (%s) and (%s)",
+                DEFAULT_WATERMARK_COLUMN, DEFAULT_ENTITY, 
lwmPredicate.getCondition(), hwmPredicate.getCondition())
+        },
+        {
+            // With no predicates
+            ImmutableList.of(),
+            String.format("SELECT MAX(%s) FROM %s",
+                DEFAULT_WATERMARK_COLUMN, DEFAULT_ENTITY)
+        }
+    };
+  }
+
+  @Test(dataProvider = "provideGetHighWatermarkMetadataTestData")
+  public void testGetHighWatermarkMetadata(List<Predicate> predicateList,
+      String restQueryExpected) throws HighWatermarkException, 
RestApiClientException {
+
+    List<Command> commandsActual =
+        _classUnderTest.getHighWatermarkMetadata(DEFAULT_SCHEMA, 
DEFAULT_ENTITY, DEFAULT_WATERMARK_COLUMN,
+            predicateList);
+
+    String fullUri = new SalesforceConnector(new 
State()).getFullUri(SalesforceExtractor.getSoqlUrl(restQueryExpected));
+    List<Command> commandsExpected = Collections.singletonList(
+        new RestApiCommand().build(Collections.singletonList(fullUri), 
RestApiCommand.RestApiCommandType.GET));
+
+    Assert.assertEquals(commandsActual.size(), 1);
+    Assert.assertEquals(commandsActual.get(0).getCommandType(), 
commandsExpected.get(0).getCommandType());
+    Assert.assertEquals(commandsActual.get(0).getParams(), 
commandsExpected.get(0).getParams());
+  }
+}
\ No newline at end of file

Reply via email to