Will-Lo commented on code in PR #3768: URL: https://github.com/apache/gobblin/pull/3768#discussion_r1323861172
########## gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceHistogramService.java: ########## @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.salesforce; + +import com.google.common.math.DoubleMath; +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import java.math.RoundingMode; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.text.StrSubstitutor; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.source.extractor.DataRecordException; +import org.apache.gobblin.source.extractor.exception.RestApiClientException; +import org.apache.gobblin.source.extractor.exception.RestApiConnectionException; +import org.apache.gobblin.source.extractor.exception.RestApiProcessingException; +import org.apache.gobblin.source.extractor.extract.Command; +import org.apache.gobblin.source.extractor.extract.CommandOutput; +import org.apache.gobblin.source.extractor.extract.restapi.RestApiConnector; +import org.apache.gobblin.source.extractor.partition.Partition; +import org.apache.gobblin.source.extractor.partition.Partitioner; +import org.apache.gobblin.source.extractor.utils.Utils; + +import static org.apache.gobblin.configuration.ConfigurationKeys.*; + + +/** + * This class encapsulates everything related to histogram calculation for Salesforce. A histogram here refers to a + * mapping of number of records to be fetched by time intervals. + */ +@Slf4j +public class SalesforceHistogramService { + private static final int MIN_SPLIT_TIME_MILLIS = 1000; + private static final String ZERO_TIME_SUFFIX = "-00:00:00"; + private static final Gson GSON = new Gson(); + // this is used to generate histogram buckets smaller than the target partition size to allow for more even + // packing of the generated partitions + private static final String PROBE_TARGET_RATIO = "salesforce.probeTargetRatio"; + private static final double DEFAULT_PROBE_TARGET_RATIO = 0.60; + private static final String DYNAMIC_PROBING_LIMIT = "salesforce.dynamicProbingLimit"; + private static final int DEFAULT_DYNAMIC_PROBING_LIMIT = 1000; + + private static final String DAY_PARTITION_QUERY_TEMPLATE = + "SELECT count(${column}) cnt, DAY_ONLY(${column}) time FROM ${table} " + "WHERE ${column} ${greater} ${start}" + + " AND ${column} ${less} ${end} GROUP BY DAY_ONLY(${column}) ORDER BY DAY_ONLY(${column})"; + private static final String PROBE_PARTITION_QUERY_TEMPLATE = "SELECT count(${column}) cnt FROM ${table} " + + "WHERE ${column} ${greater} ${start} AND ${column} ${less} ${end}"; + + protected SalesforceConnector salesforceConnector = null; + private final SfConfig sfConfig; + + SalesforceHistogramService(SfConfig sfConfig) { + this.sfConfig = sfConfig; + } + + /** + * Generate the histogram + */ + Histogram getHistogram(String entity, String watermarkColumn, SourceState state, + Partition partition) { + SalesforceConnector connector = getConnector(state); + + try { + if (!connector.connect()) { + throw new RuntimeException("Failed to connect."); + } + } catch (RestApiConnectionException e) { + throw new RuntimeException("Failed to connect.", e); + } + + Histogram histogram = getHistogramByDayBucketing(connector, entity, watermarkColumn, partition); + + // exchange the first histogram group key with the global low watermark to ensure that the low watermark is captured + // in the range of generated partitions + HistogramGroup firstGroup = histogram.get(0); + Date lwmDate = Utils.toDate(partition.getLowWatermark(), Partitioner.WATERMARKTIMEFORMAT); + histogram.getGroups().set(0, new HistogramGroup(Utils.epochToDate(lwmDate.getTime(), SalesforceSource.SECONDS_FORMAT), + firstGroup.getCount())); + + // refine the histogram + if (state.getPropAsBoolean(SalesforceSource.ENABLE_DYNAMIC_PROBING)) { + histogram = getRefinedHistogram(connector, entity, watermarkColumn, state, partition, histogram); + } + + return histogram; + } + + /** + * Get a histogram with day granularity buckets. + */ + private Histogram getHistogramByDayBucketing(SalesforceConnector connector, String entity, String watermarkColumn, + Partition partition) { + Histogram histogram = new Histogram(); + + Calendar calendar = new GregorianCalendar(); + Date startDate = Utils.toDate(partition.getLowWatermark(), Partitioner.WATERMARKTIMEFORMAT); + calendar.setTime(startDate); + int startYear = calendar.get(Calendar.YEAR); + String lowWatermarkDate = Utils.dateToString(startDate, SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT); + + Date endDate = Utils.toDate(partition.getHighWatermark(), Partitioner.WATERMARKTIMEFORMAT); + calendar.setTime(endDate); + int endYear = calendar.get(Calendar.YEAR); + String highWatermarkDate = Utils.dateToString(endDate, SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT); + + Map<String, String> values = new HashMap<>(); + values.put("table", entity); + values.put("column", watermarkColumn); + StrSubstitutor sub = new StrSubstitutor(values); + + for (int year = startYear; year <= endYear; year++) { + if (year == startYear) { + values.put("start", lowWatermarkDate); + values.put("greater", partition.isLowWatermarkInclusive() ? ">=" : ">"); + } else { + values.put("start", getDateString(year)); + values.put("greater", ">="); + } + + if (year == endYear) { + values.put("end", highWatermarkDate); + values.put("less", partition.isHighWatermarkInclusive() ? "<=" : "<"); + } else { + values.put("end", getDateString(year + 1)); + values.put("less", "<"); + } + + String query = sub.replace(DAY_PARTITION_QUERY_TEMPLATE); + log.info("Histogram query: " + query); + + histogram.add(parseDayBucketingHistogram(getRecordsForQuery(connector, query))); + } + + return histogram; + } + + protected SalesforceConnector getConnector(State state) { + if (this.salesforceConnector == null) { + this.salesforceConnector = new SalesforceConnector(state); + } + return this.salesforceConnector; + } Review Comment: Is it possible to send the connector from the source into this class instead of reinitializing it? Connectors can sometimes perform authentication/require SSL handshake and verification which adds latency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
