[
https://issues.apache.org/jira/browse/GOBBLIN-1904?focusedWorklogId=880083&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-880083
]
ASF GitHub Bot logged work on GOBBLIN-1904:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 13/Sep/23 02:29
Start Date: 13/Sep/23 02:29
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #3768:
URL: https://github.com/apache/gobblin/pull/3768#discussion_r1323861172
##########
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceHistogramService.java:
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.salesforce;
+
+import com.google.common.math.DoubleMath;
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.text.StrSubstitutor;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.exception.RestApiClientException;
+import
org.apache.gobblin.source.extractor.exception.RestApiConnectionException;
+import
org.apache.gobblin.source.extractor.exception.RestApiProcessingException;
+import org.apache.gobblin.source.extractor.extract.Command;
+import org.apache.gobblin.source.extractor.extract.CommandOutput;
+import org.apache.gobblin.source.extractor.extract.restapi.RestApiConnector;
+import org.apache.gobblin.source.extractor.partition.Partition;
+import org.apache.gobblin.source.extractor.partition.Partitioner;
+import org.apache.gobblin.source.extractor.utils.Utils;
+
+import static org.apache.gobblin.configuration.ConfigurationKeys.*;
+
+
+/**
+ * This class encapsulates everything related to histogram calculation for
Salesforce. A histogram here refers to a
+ * mapping of number of records to be fetched by time intervals.
+ */
+@Slf4j
+public class SalesforceHistogramService {
+ private static final int MIN_SPLIT_TIME_MILLIS = 1000;
+ private static final String ZERO_TIME_SUFFIX = "-00:00:00";
+ private static final Gson GSON = new Gson();
+ // this is used to generate histogram buckets smaller than the target
partition size to allow for more even
+ // packing of the generated partitions
+ private static final String PROBE_TARGET_RATIO =
"salesforce.probeTargetRatio";
+ private static final double DEFAULT_PROBE_TARGET_RATIO = 0.60;
+ private static final String DYNAMIC_PROBING_LIMIT =
"salesforce.dynamicProbingLimit";
+ private static final int DEFAULT_DYNAMIC_PROBING_LIMIT = 1000;
+
+ private static final String DAY_PARTITION_QUERY_TEMPLATE =
+ "SELECT count(${column}) cnt, DAY_ONLY(${column}) time FROM ${table} " +
"WHERE ${column} ${greater} ${start}"
+ + " AND ${column} ${less} ${end} GROUP BY DAY_ONLY(${column}) ORDER
BY DAY_ONLY(${column})";
+ private static final String PROBE_PARTITION_QUERY_TEMPLATE = "SELECT
count(${column}) cnt FROM ${table} "
+ + "WHERE ${column} ${greater} ${start} AND ${column} ${less} ${end}";
+
+ protected SalesforceConnector salesforceConnector = null;
+ private final SfConfig sfConfig;
+
+ SalesforceHistogramService(SfConfig sfConfig) {
+ this.sfConfig = sfConfig;
+ }
+
+ /**
+ * Generate the histogram
+ */
+ Histogram getHistogram(String entity, String watermarkColumn, SourceState
state,
+ Partition partition) {
+ SalesforceConnector connector = getConnector(state);
+
+ try {
+ if (!connector.connect()) {
+ throw new RuntimeException("Failed to connect.");
+ }
+ } catch (RestApiConnectionException e) {
+ throw new RuntimeException("Failed to connect.", e);
+ }
+
+ Histogram histogram = getHistogramByDayBucketing(connector, entity,
watermarkColumn, partition);
+
+ // exchange the first histogram group key with the global low watermark to
ensure that the low watermark is captured
+ // in the range of generated partitions
+ HistogramGroup firstGroup = histogram.get(0);
+ Date lwmDate = Utils.toDate(partition.getLowWatermark(),
Partitioner.WATERMARKTIMEFORMAT);
+ histogram.getGroups().set(0, new
HistogramGroup(Utils.epochToDate(lwmDate.getTime(),
SalesforceSource.SECONDS_FORMAT),
+ firstGroup.getCount()));
+
+ // refine the histogram
+ if (state.getPropAsBoolean(SalesforceSource.ENABLE_DYNAMIC_PROBING)) {
+ histogram = getRefinedHistogram(connector, entity, watermarkColumn,
state, partition, histogram);
+ }
+
+ return histogram;
+ }
+
+ /**
+ * Get a histogram with day granularity buckets.
+ */
+ private Histogram getHistogramByDayBucketing(SalesforceConnector connector,
String entity, String watermarkColumn,
+ Partition partition) {
+ Histogram histogram = new Histogram();
+
+ Calendar calendar = new GregorianCalendar();
+ Date startDate = Utils.toDate(partition.getLowWatermark(),
Partitioner.WATERMARKTIMEFORMAT);
+ calendar.setTime(startDate);
+ int startYear = calendar.get(Calendar.YEAR);
+ String lowWatermarkDate = Utils.dateToString(startDate,
SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT);
+
+ Date endDate = Utils.toDate(partition.getHighWatermark(),
Partitioner.WATERMARKTIMEFORMAT);
+ calendar.setTime(endDate);
+ int endYear = calendar.get(Calendar.YEAR);
+ String highWatermarkDate = Utils.dateToString(endDate,
SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT);
+
+ Map<String, String> values = new HashMap<>();
+ values.put("table", entity);
+ values.put("column", watermarkColumn);
+ StrSubstitutor sub = new StrSubstitutor(values);
+
+ for (int year = startYear; year <= endYear; year++) {
+ if (year == startYear) {
+ values.put("start", lowWatermarkDate);
+ values.put("greater", partition.isLowWatermarkInclusive() ? ">=" :
">");
+ } else {
+ values.put("start", getDateString(year));
+ values.put("greater", ">=");
+ }
+
+ if (year == endYear) {
+ values.put("end", highWatermarkDate);
+ values.put("less", partition.isHighWatermarkInclusive() ? "<=" : "<");
+ } else {
+ values.put("end", getDateString(year + 1));
+ values.put("less", "<");
+ }
+
+ String query = sub.replace(DAY_PARTITION_QUERY_TEMPLATE);
+ log.info("Histogram query: " + query);
+
+ histogram.add(parseDayBucketingHistogram(getRecordsForQuery(connector,
query)));
+ }
+
+ return histogram;
+ }
+
+ protected SalesforceConnector getConnector(State state) {
+ if (this.salesforceConnector == null) {
+ this.salesforceConnector = new SalesforceConnector(state);
+ }
+ return this.salesforceConnector;
+ }
Review Comment:
Is it possible to send the connector from the source into this class instead
of reinitializing it? Connectors can sometimes perform authentication/require
SSL handshake and verification which adds latency.
Issue Time Tracking
-------------------
Worklog Id: (was: 880083)
Time Spent: 20m (was: 10m)
> Refactor Salesforce Source to promote testability
> -------------------------------------------------
>
> Key: GOBBLIN-1904
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1904
> Project: Apache Gobblin
> Issue Type: Improvement
> Components: gobblin-core
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Salesforce source is difficult to test due to many internal classes that
> cannot mock inputs and outputs. We should clean this class and use
> composition to promote testability.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)