This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 6cfdd15d6 [GOBBLIN-1888] Optimizing high watermark metadata query for
SFDC (#3750)
6cfdd15d6 is described below
commit 6cfdd15d660cfe748c9184e1324a7d3d18fd317f
Author: Gautam Kumar <[email protected]>
AuthorDate: Mon Aug 28 08:29:54 2023 +0530
[GOBBLIN-1888] Optimizing high watermark metadata query for SFDC (#3750)
Co-authored-by: Gautam Kumar <[email protected]>
---
.../gobblin/salesforce/SalesforceExtractor.java | 11 +--
.../salesforce/SalesforceExtractorTest.java | 109 +++++++++++++++++++++
2 files changed, 112 insertions(+), 8 deletions(-)
diff --git
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index d902a91d7..648d1bb2c 100644
---
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -237,9 +237,8 @@ public class SalesforceExtractor extends RestApiExtractor {
public List<Command> getHighWatermarkMetadata(String schema, String entity,
String watermarkColumn,
List<Predicate> predicateList) throws HighWatermarkException {
log.debug("Build url to retrieve high watermark");
- String query = "SELECT " + watermarkColumn + " FROM " + entity;
- String defaultPredicate = " " + watermarkColumn + " != null";
- String defaultSortOrder = " ORDER BY " + watermarkColumn + " desc LIMIT 1";
+
+ String query = "SELECT MAX(" + watermarkColumn + ") FROM " + entity;
String existingPredicate = "";
if (this.updatedQuery != null) {
@@ -254,13 +253,9 @@ public class SalesforceExtractor extends RestApiExtractor {
String limitString = getLimitFromInputQuery(query);
query = query.replace(limitString, "");
- Iterator<Predicate> i = predicateList.listIterator();
- while (i.hasNext()) {
- Predicate predicate = i.next();
+ for (Predicate predicate : predicateList) {
query = SqlQueryUtils.addPredicate(query, predicate.getCondition());
}
- query = SqlQueryUtils.addPredicate(query, defaultPredicate);
- query = query + defaultSortOrder;
log.info("getHighWatermarkMetadata - QUERY: " + query);
try {
diff --git
a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceExtractorTest.java
b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceExtractorTest.java
new file mode 100644
index 000000000..d4182ef59
--- /dev/null
+++
b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceExtractorTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.salesforce;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.exception.HighWatermarkException;
+import org.apache.gobblin.source.extractor.exception.RestApiClientException;
+import org.apache.gobblin.source.extractor.extract.Command;
+import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand;
+import org.apache.gobblin.source.extractor.partition.Partition;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.extractor.watermark.TimestampWatermark;
+import org.apache.gobblin.source.extractor.watermark.WatermarkType;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.testng.Assert;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class SalesforceExtractorTest {
+
+ private static final String DEFAULT_SCHEMA = "test-schema";
+ private static final String DEFAULT_ENTITY = "test-entity";
+ private static final String DEFAULT_WATERMARK_COLUMN =
"test-watermark-column";
+ private static final String GTE_OPERATOR = ">=";
+ private static final String LTE_OPERATOR = "<=";
+ private static final long LWM_VALUE_1 = 20131212121212L;
+ private static final long HWM_VALUE_1 = 20231212121212L;
+ private static final String DEFAULT_WATERMARK_VALUE_FORMAT =
"yyyyMMddHHmmss";
+
+ private SalesforceExtractor _classUnderTest;
+
+ @BeforeTest
+ public void beforeTest() {
+ WorkUnit workUnit = WorkUnit.createEmpty();
+ workUnit.setProp(Partition.IS_LAST_PARTIITON, false);
+ workUnit.setProp(ConfigurationKeys.SOURCE_QUERYBASED_EXTRACT_TYPE,
"SNAPSHOT");
+ WorkUnitState workUnitState = new WorkUnitState(workUnit, new State());
+ workUnitState.setId("test");
+ _classUnderTest = new SalesforceExtractor(workUnitState);
+ }
+
+ @DataProvider
+ private Object[][] provideGetHighWatermarkMetadataTestData() {
+
+ TimestampWatermark watermark =
+ new TimestampWatermark(DEFAULT_WATERMARK_COLUMN,
DEFAULT_WATERMARK_VALUE_FORMAT);
+ String lwmCondition = watermark.getWatermarkCondition(_classUnderTest,
LWM_VALUE_1, GTE_OPERATOR);
+ String hwmCondition = watermark.getWatermarkCondition(_classUnderTest,
HWM_VALUE_1, LTE_OPERATOR);
+ Predicate lwmPredicate =
+ new Predicate(DEFAULT_WATERMARK_COLUMN, LWM_VALUE_1, lwmCondition,
+ _classUnderTest.getWatermarkSourceFormat(WatermarkType.TIMESTAMP),
Predicate.PredicateType.LWM);
+ Predicate hwmPredicate =
+ new Predicate(DEFAULT_WATERMARK_COLUMN, HWM_VALUE_1, hwmCondition,
+ _classUnderTest.getWatermarkSourceFormat(WatermarkType.TIMESTAMP),
Predicate.PredicateType.HWM);
+
+ return new Object[][] {
+ {
+ // With low and high watermark predicates
+ ImmutableList.of(lwmPredicate, hwmPredicate),
+ String.format("SELECT MAX(%s) FROM %s where (%s) and (%s)",
+ DEFAULT_WATERMARK_COLUMN, DEFAULT_ENTITY,
lwmPredicate.getCondition(), hwmPredicate.getCondition())
+ },
+ {
+ // With no predicates
+ ImmutableList.of(),
+ String.format("SELECT MAX(%s) FROM %s",
+ DEFAULT_WATERMARK_COLUMN, DEFAULT_ENTITY)
+ }
+ };
+ }
+
+ @Test(dataProvider = "provideGetHighWatermarkMetadataTestData")
+ public void testGetHighWatermarkMetadata(List<Predicate> predicateList,
+ String restQueryExpected) throws HighWatermarkException,
RestApiClientException {
+
+ List<Command> commandsActual =
+ _classUnderTest.getHighWatermarkMetadata(DEFAULT_SCHEMA,
DEFAULT_ENTITY, DEFAULT_WATERMARK_COLUMN,
+ predicateList);
+
+ String fullUri = new SalesforceConnector(new
State()).getFullUri(SalesforceExtractor.getSoqlUrl(restQueryExpected));
+ List<Command> commandsExpected = Collections.singletonList(
+ new RestApiCommand().build(Collections.singletonList(fullUri),
RestApiCommand.RestApiCommandType.GET));
+
+ Assert.assertEquals(commandsActual.size(), 1);
+ Assert.assertEquals(commandsActual.get(0).getCommandType(),
commandsExpected.get(0).getCommandType());
+ Assert.assertEquals(commandsActual.get(0).getParams(),
commandsExpected.get(0).getParams());
+ }
+}
\ No newline at end of file