This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2a1fc4603 [GOBBLIN-1917] Logging updates for Salesforce classes (#3786)
2a1fc4603 is described below
commit 2a1fc46038e75eefc27f46f30d52d2fd7b1e2d9d
Author: Gautam Kumar <[email protected]>
AuthorDate: Fri Sep 22 08:45:25 2023 +0530
[GOBBLIN-1917] Logging updates for Salesforce classes (#3786)
* Logging updates for Salesforce classes
- Updating the log messages in the Salesforce related classes.
- In some cases, the exception was not included in the log. Adding it to
make debugging issues easier.
- For some cases, added more details in the logs to print the current
progress of the tasks.
- Minor coding style updates as well to improve code readability.
- There are no functional changes in this PR
* Fixed logging of jobTaskMetrics
* Revert logging on cancel in GobblinHelixJobTask class
---
.../gobblin/cluster/GobblinHelixJobTask.java | 7 +-
.../apache/gobblin/runtime/SafeDatasetCommit.java | 2 +-
.../gobblin/salesforce/BulkResultIterator.java | 18 +--
.../gobblin/salesforce/SalesforceExtractor.java | 122 ++++++++-------------
.../salesforce/SalesforceHistogramService.java | 8 +-
.../gobblin/salesforce/SalesforceSource.java | 49 +++++----
6 files changed, 89 insertions(+), 117 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
index 218d5834c..d463ddad9 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
@@ -174,7 +174,7 @@ class GobblinHelixJobTask implements Task {
try {
HelixUtils.deleteWorkflow(previousActualJobId,
this.jobHelixManager, timeOut);
} catch (HelixException e) {
- log.error("Helix cannot delete previous actual job id {} within
{} seconds.", previousActualJobId, timeOut / 1000);
+ log.error("Helix cannot delete previous actual job id {} within
{} seconds.", previousActualJobId, timeOut / 1000, e);
return new TaskResult(TaskResult.Status.FAILED,
ExceptionUtils.getFullStackTrace(e));
}
}
@@ -202,7 +202,7 @@ class GobblinHelixJobTask implements Task {
log.info("Completing planning job {}", this.planningJobId);
return new TaskResult(TaskResult.Status.COMPLETED, "");
} catch (Exception e) {
- log.info("Failing planning job {}", this.planningJobId);
+ log.warn("Failing planning job {}", this.planningJobId, e);
return new TaskResult(TaskResult.Status.FAILED, "Exception occurred for
job " + planningJobId + ":" + ExceptionUtils
.getFullStackTrace(e));
} finally {
@@ -211,6 +211,7 @@ class GobblinHelixJobTask implements Task {
try {
this.jobsMapping.deleteMapping(jobUri);
} catch (Exception e) {
+ log.warn("Failed to delete jobs mapping for job: {}", jobUri, e);
return new TaskResult(TaskResult.Status.FAILED,"Cannot delete jobs
mapping for job : " + jobUri);
}
}
@@ -230,7 +231,7 @@ class GobblinHelixJobTask implements Task {
try {
this.jobsMapping.deleteMapping(jobUri);
} catch (Exception e) {
- throw new RuntimeException("Cannot delete jobs mapping for job : " +
jobUri);
+ throw new RuntimeException("Cannot delete jobs mapping for job : " +
jobUri, e);
}
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index ffd2683a2..55c9ebd76 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -104,7 +104,7 @@ final class SafeDatasetCommit implements Callable<Void> {
this.jobContext.getJobId(),
this.jobContext.getJobCommitPolicy(), this.datasetState.getState()));
}
} catch (ReflectiveOperationException roe) {
- log.error("Failed to instantiate data publisher for dataset %s of job
%s.", this.datasetUrn,
+ log.error("Failed to instantiate data publisher for dataset {} of job
{}.", this.datasetUrn,
this.jobContext.getJobId(), roe);
throw new RuntimeException(roe);
} finally {
diff --git
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
index 0f0aac373..06bfd7043 100644
---
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
+++
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
@@ -38,19 +38,19 @@ import org.apache.gobblin.source.extractor.utils.Utils;
*/
@Slf4j
public class BulkResultIterator implements Iterator<JsonElement> {
- private FileIdVO fileIdVO;
- private int retryLimit;
- private BulkConnection conn;
+ private final FileIdVO fileIdVO;
+ private final int retryLimit;
+ private final BulkConnection conn;
private InputStreamCSVReader csvReader;
private List<String> header;
private int columnSize;
private int lineCount = 0; // this is different than currentFileRowCount.
cvs file has header
- private long retryInterval;
- private long retryExceedQuotaInterval;
+ private final long retryInterval;
+ private final long retryExceedQuotaInterval;
private List<String> preLoadedLine = null;
public BulkResultIterator(BulkConnection conn, FileIdVO fileIdVO, int
retryLimit, long retryInterval, long retryExceedQuotaInterval) {
- log.info("create BulkResultIterator: " + fileIdVO);
+ log.info("create BulkResultIterator: {} with retry limit as {} and
retryInterval as {}", fileIdVO, retryLimit, retryInterval);
this.retryInterval = retryInterval;
this.retryExceedQuotaInterval = retryExceedQuotaInterval;
this.conn = conn;
@@ -87,17 +87,17 @@ public class BulkResultIterator implements
Iterator<JsonElement> {
// Each organization is allowed 10 concurrent long-running requests.
If the limit is reached,
// any new synchronous Apex request results in a runtime exception.
if (e.isCurrentExceptionExceedQuota()) {
- log.warn("--Caught ExceededQuota: " + e.getMessage());
+ log.warn("--Caught ExceededQuota: ", e);
threadSleep(retryExceedQuotaInterval);
executeCount--; // if the current exception is Quota Exceeded, keep
trying forever
}
- log.info("***Retrying***1: {} - {}", fileIdVO, e.getMessage());
+ log.info("***Retrying***1: {} - Attempt {}/{}", fileIdVO, executeCount
+ 1, retryLimit, e);
this.csvReader = null; // in next loop, call openAndSeekCsvReader
} catch (Exception e) {
// Retry may resolve other exceptions.
rootCause = e;
threadSleep(retryInterval);
- log.info("***Retrying***2: {} - {}", fileIdVO, e.getMessage());
+ log.info("***Retrying***2: {} - Attempt {}/{}", fileIdVO, executeCount
+ 1, retryLimit, e);
this.csvReader = null; // in next loop, call openAndSeekCsvReader
}
}
diff --git
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index 50935ed60..c9a041971 100644
---
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -24,10 +24,10 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
-import java.util.ListIterator;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -55,6 +55,7 @@ import com.sforce.async.BulkConnection;
import com.sforce.async.ConcurrencyMode;
import com.sforce.async.ContentType;
import com.sforce.async.JobInfo;
+import com.sforce.async.JobStateEnum;
import com.sforce.async.OperationEnum;
import com.sforce.async.QueryResultList;
import com.sforce.soap.partner.PartnerConnection;
@@ -115,9 +116,6 @@ public class SalesforceExtractor extends RestApiExtractor {
private BulkConnection bulkConnection = null;
private JobInfo bulkJob = new JobInfo();
- private List<BatchIdAndResultId> bulkResultIdList;
- private boolean bulkJobFinished = true;
- private boolean newBulkResultSet = true;
private final int pkChunkingSize;
private final SalesforceConnector sfConnector;
@@ -127,12 +125,11 @@ public class SalesforceExtractor extends RestApiExtractor
{
private final long retryExceedQuotaInterval;
private final boolean bulkApiUseQueryAll;
- private SfConfig conf;
-
+ private boolean isPkChunkingFetchDone = false;
public SalesforceExtractor(WorkUnitState state) {
super(state);
- conf = new SfConfig(state.getProperties());
+ SfConfig conf = new SfConfig(state.getProperties());
this.sfConnector = (SalesforceConnector) this.connector;
this.pkChunkingSize = conf.pkChunkingSize;
@@ -162,22 +159,6 @@ public class SalesforceExtractor extends RestApiExtractor {
this.nextUrl = nextUrl;
}
- private boolean isBulkJobFinished() {
- return this.bulkJobFinished;
- }
-
- private void setBulkJobFinished(boolean bulkJobFinished) {
- this.bulkJobFinished = bulkJobFinished;
- }
-
- private boolean isNewBulkResultSet() {
- return this.newBulkResultSet;
- }
-
- private void setNewBulkResultSet(boolean newBulkResultSet) {
- this.newBulkResultSet = newBulkResultSet;
- }
-
@Override
public HttpEntity getAuthentication() throws RestApiConnectionException {
log.debug("Authenticating salesforce");
@@ -185,7 +166,7 @@ public class SalesforceExtractor extends RestApiExtractor {
}
@Override
- public List<Command> getSchemaMetadata(String schema, String entity) throws
SchemaException {
+ public List<Command> getSchemaMetadata(String schema, String entity) {
log.debug("Build url to retrieve schema");
return constructGetCommand(this.sfConnector.getFullUri("/sobjects/" +
entity.trim() + "/describe"));
}
@@ -265,12 +246,12 @@ public class SalesforceExtractor extends RestApiExtractor
{
for (Predicate predicate : predicateList) {
query = SqlQueryUtils.addPredicate(query, predicate.getCondition());
}
- log.info("getHighWatermarkMetadata - QUERY: " + query);
+ log.info("getHighWatermarkMetadata - QUERY: {}", query);
try {
return
constructGetCommand(this.sfConnector.getFullUri(getSoqlUrl(query)));
} catch (Exception e) {
- throw new HighWatermarkException("Failed to get salesforce url for high
watermark; error - " + e.getMessage(), e);
+ throw new HighWatermarkException("Failed to get salesforce url for high
watermark", e);
}
}
@@ -401,13 +382,11 @@ public class SalesforceExtractor extends RestApiExtractor
{
String limitString = getLimitFromInputQuery(query);
query = query.replace(limitString, "");
- Iterator<Predicate> i = predicateList.listIterator();
- while (i.hasNext()) {
- Predicate predicate = i.next();
+ for (Predicate predicate : predicateList) {
query = SqlQueryUtils.addPredicate(query, predicate.getCondition());
}
- if
(Boolean.valueOf(this.workUnitState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_IS_SPECIFIC_API_ACTIVE)))
{
+ if
(Boolean.parseBoolean(this.workUnitState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_IS_SPECIFIC_API_ACTIVE)))
{
query = SqlQueryUtils.addPredicate(query, "IsDeleted = true");
}
@@ -442,7 +421,7 @@ public class SalesforceExtractor extends RestApiExtractor {
throw new DataRecordException("Failed to get data from salesforce; REST
response has no output");
}
- List<JsonElement> rs = Lists.newArrayList();
+ List<JsonElement> records = Lists.newArrayList();
JsonElement element = GSON.fromJson(output, JsonObject.class);
JsonArray partRecords;
try {
@@ -457,14 +436,12 @@ public class SalesforceExtractor extends RestApiExtractor
{
}
JsonArray array = Utils.removeElementFromJsonArray(partRecords,
"attributes");
- Iterator<JsonElement> li = array.iterator();
- while (li.hasNext()) {
- JsonElement recordElement = li.next();
- rs.add(recordElement);
+ for (JsonElement recordElement : array) {
+ records.add(recordElement);
}
- return rs.iterator();
+ return records.iterator();
} catch (Exception e) {
- throw new DataRecordException("Failed to get records from salesforce;
error - " + e.getMessage(), e);
+ throw new DataRecordException("Failed to get records from salesforce",
e);
}
}
@@ -489,9 +466,7 @@ public class SalesforceExtractor extends RestApiExtractor {
private static String buildUrl(String path, List<NameValuePair> qparams)
throws RestApiClientException {
URIBuilder builder = new URIBuilder();
builder.setPath(path);
- ListIterator<NameValuePair> i = qparams.listIterator();
- while (i.hasNext()) {
- NameValuePair keyValue = i.next();
+ for (NameValuePair keyValue : qparams) {
builder.setParameter(keyValue.getName(), keyValue.getValue());
}
URI uri;
@@ -504,10 +479,7 @@ public class SalesforceExtractor extends RestApiExtractor {
}
private static boolean isNullPredicate(List<Predicate> predicateList) {
- if (predicateList == null || predicateList.size() == 0) {
- return true;
- }
- return false;
+ return predicateList == null || predicateList.isEmpty();
}
@Override
@@ -525,27 +497,27 @@ public class SalesforceExtractor extends RestApiExtractor
{
@Override
public String getHourPredicateCondition(String column, long value, String
valueFormat, String operator) {
log.info("Getting hour predicate from salesforce");
- String formattedvalue = Utils.toDateTimeFormat(Long.toString(value),
valueFormat, SALESFORCE_HOUR_FORMAT);
- return column + " " + operator + " " + formattedvalue;
+ String formattedValue = Utils.toDateTimeFormat(Long.toString(value),
valueFormat, SALESFORCE_HOUR_FORMAT);
+ return column + " " + operator + " " + formattedValue;
}
@Override
public String getDatePredicateCondition(String column, long value, String
valueFormat, String operator) {
log.info("Getting date predicate from salesforce");
- String formattedvalue = Utils.toDateTimeFormat(Long.toString(value),
valueFormat, SALESFORCE_DATE_FORMAT);
- return column + " " + operator + " " + formattedvalue;
+ String formattedValue = Utils.toDateTimeFormat(Long.toString(value),
valueFormat, SALESFORCE_DATE_FORMAT);
+ return column + " " + operator + " " + formattedValue;
}
@Override
public String getTimestampPredicateCondition(String column, long value,
String valueFormat, String operator) {
log.info("Getting timestamp predicate from salesforce");
- String formattedvalue = Utils.toDateTimeFormat(Long.toString(value),
valueFormat, SALESFORCE_TIMESTAMP_FORMAT);
- return column + " " + operator + " " + formattedvalue;
+ String formattedValue = Utils.toDateTimeFormat(Long.toString(value),
valueFormat, SALESFORCE_TIMESTAMP_FORMAT);
+ return column + " " + operator + " " + formattedValue;
}
@Override
public Map<String, String> getDataTypeMap() {
- Map<String, String> dataTypeMap = ImmutableMap.<String,
String>builder().put("url", "string")
+ return ImmutableMap.<String, String>builder().put("url", "string")
.put("textarea", "string").put("reference", "string").put("phone",
"string").put("masterrecord", "string")
.put("location", "string").put("id", "string").put("encryptedstring",
"string").put("email", "string")
.put("DataCategoryGroupReference", "string").put("calculated",
"string").put("anyType", "string")
@@ -554,11 +526,8 @@ public class SalesforceExtractor extends RestApiExtractor {
.put("double", "double").put("percent", "double").put("currency",
"double").put("decimal", "double")
.put("boolean", "boolean").put("picklist",
"string").put("multipicklist", "string").put("combobox", "string")
.put("list", "string").put("set", "string").put("map",
"string").put("enum", "string").build();
- return dataTypeMap;
}
- private Boolean isPkChunkingFetchDone = false;
-
private Iterator<JsonElement> fetchRecordSetPkChunking(WorkUnit workUnit) {
if (isPkChunkingFetchDone) {
return null; // must return null to represent no more data.
@@ -592,19 +561,17 @@ public class SalesforceExtractor extends RestApiExtractor
{
isBulkFetchDone = true;
log.info("----Get records for bulk batch job----");
try {
- // set finish status to false before starting the bulk job
- this.setBulkJobFinished(false);
- this.bulkResultIdList = getQueryResultIds(entity, predicateList);
- log.info("Number of bulk api resultSet Ids:" +
this.bulkResultIdList.size());
- List<FileIdVO> fileIdVoList = this.bulkResultIdList.stream()
- .map(x -> new FileIdVO(this.bulkJob.getId(), x.batchId, x.resultId))
+ List<BatchIdAndResultId> batchIdAndResultIds = getQueryResultIds(entity,
predicateList);
+ log.info("Number of bulk api resultSet Ids:" +
batchIdAndResultIds.size());
+ List<FileIdVO> fileIdVoList = batchIdAndResultIds.stream()
+ .map(batchIdAndResultId -> new FileIdVO(this.bulkJob.getId(),
batchIdAndResultId.batchId, batchIdAndResultId.resultId))
.collect(Collectors.toList());
ResultChainingIterator chainingIter = new ResultChainingIterator(
bulkConnection, fileIdVoList, retryLimit, retryInterval,
retryExceedQuotaInterval);
chainingIter.add(getSoftDeletedRecords(schema, entity, workUnit,
predicateList));
return chainingIter;
} catch (Exception e) {
- throw new RuntimeException("Failed to get records using bulk api; error
- " + e.getMessage(), e);
+ throw new RuntimeException("Failed to get records using bulk api", e);
}
}
@@ -764,9 +731,7 @@ public class SalesforceExtractor extends RestApiExtractor {
if (!isNullPredicate(predicateList)) {
String limitString = getLimitFromInputQuery(query);
query = query.replace(limitString, "");
- Iterator<Predicate> i = predicateList.listIterator();
- while (i.hasNext()) {
- Predicate predicate = i.next();
+ for (Predicate predicate : predicateList) {
query = SqlQueryUtils.addPredicate(query, predicate.getCondition());
}
query = query + limitString;
@@ -797,8 +762,7 @@ public class SalesforceExtractor extends RestApiExtractor {
log.error("Bulk batch failed: " + batchResponse.toString());
throw new Exception("Failed to get bulk batch info for jobId " + jobId
+ " error - " + batchResponse.getStateMessage());
}
- ResultFileIdsStruct resultFileIdsStruct =
retrievePkChunkingResultIds(connection, jobId, waitMilliSecond);
- return resultFileIdsStruct;
+ return retrievePkChunkingResultIds(connection, jobId, waitMilliSecond);
} catch (Exception e) {
throw new RuntimeException("getQueryResultIdsPkChunking: error - " +
e.getMessage(), e);
}
@@ -810,7 +774,7 @@ public class SalesforceExtractor extends RestApiExtractor {
* @param predicateList of all predicate conditions
* @return iterator with batch of records
*/
- private List<BatchIdAndResultId> getQueryResultIds(String entity,
List<Predicate> predicateList) throws Exception {
+ private List<BatchIdAndResultId> getQueryResultIds(String entity,
List<Predicate> predicateList) {
bulkApiLogin();
try {
// Set bulk job attributes
@@ -832,16 +796,14 @@ public class SalesforceExtractor extends RestApiExtractor
{
String limitString = getLimitFromInputQuery(query);
query = query.replace(limitString, "");
- Iterator<Predicate> i = predicateList.listIterator();
- while (i.hasNext()) {
- Predicate predicate = i.next();
+ for (Predicate predicate : predicateList) {
query = SqlQueryUtils.addPredicate(query, predicate.getCondition());
}
query = query + limitString;
}
- log.info("getQueryResultIds - QUERY:" + query);
+ log.info("getQueryResultIds - QUERY: {}", query);
ByteArrayInputStream bout = new
ByteArrayInputStream(query.getBytes(ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
BatchInfo bulkBatchInfo =
this.bulkConnection.createBatchFromStream(this.bulkJob, bout);
@@ -859,7 +821,10 @@ public class SalesforceExtractor extends RestApiExtractor {
ConfigurationKeys.EXTRACT_SALESFORCE_BULK_API_MAX_WAIT_TIME_IN_MILLIS_KEY,
ConfigurationKeys.DEFAULT_EXTRACT_SALESFORCE_BULK_API_MAX_WAIT_TIME_IN_MILLIS);
while (bulkBatchInfo.getState() == BatchStateEnum.InProgress ||
bulkBatchInfo.getState() == BatchStateEnum.Queued) {
- log.info("Waiting for bulk resultSetIds");
+ log.info("Waiting for bulk resultSetIds | Job ID: {} | State: {} |
State message: {} |"
+ + " Num records processed: {} | Num records failed: {}",
+ bulkBatchInfo.getJobId(), bulkBatchInfo.getState(),
bulkBatchInfo.getStateMessage(),
+ bulkBatchInfo.getNumberRecordsProcessed(),
bulkBatchInfo.getNumberRecordsFailed());
// Exponential backoff
long waitMilliSeconds = Math.min((long) (Math.pow(2, count) *
minWaitTimeInMilliSeconds), maxWaitTimeInMilliSeconds);
Thread.sleep(waitMilliSeconds);
@@ -873,7 +838,7 @@ public class SalesforceExtractor extends RestApiExtractor {
// not sure if the state can be "NotProcessed" in non-pk-chunking bulk
request
// SFDC doc says - This state is assigned when a job is aborted while
the batch is queued
if (state == BatchStateEnum.Failed || state ==
BatchStateEnum.InProgress) {
- log.error("Bulk batch failed: " + bulkBatchInfo.toString());
+ log.error("Bulk batch failed: {}", bulkBatchInfo);
throw new RuntimeException("Failed to get bulk batch info for jobId "
+ bulkBatchInfo.getJobId()
+ " error - " + bulkBatchInfo.getStateMessage());
}
@@ -893,15 +858,14 @@ public class SalesforceExtractor extends RestApiExtractor
{
return batchIdAndResultIdList;
} catch (RuntimeException | AsyncApiException | InterruptedException e) {
- throw new RuntimeException(
- "Failed to get query result ids from salesforce using bulk api;
error - " + e.getMessage(), e);
+ throw new RuntimeException("Failed to get query result ids from
salesforce using bulk api", e);
}
}
@Override
public void closeConnection() throws Exception {
if (this.bulkConnection != null
- &&
!this.bulkConnection.getJobStatus(this.getBulkJobId()).getState().toString().equals("Closed"))
{
+ &&
!JobStateEnum.Closed.equals(this.bulkConnection.getJobStatus(this.getBulkJobId()).getState()))
{
log.info("Closing salesforce bulk job connection");
this.bulkConnection.closeJob(this.getBulkJobId());
}
@@ -909,11 +873,11 @@ public class SalesforceExtractor extends RestApiExtractor
{
}
private static List<Command> constructGetCommand(String restQuery) {
- return Arrays.asList(new RestApiCommand().build(Arrays.asList(restQuery),
RestApiCommandType.GET));
+ return Collections.singletonList(new
RestApiCommand().build(Collections.singletonList(restQuery),
RestApiCommandType.GET));
}
private ResultFileIdsStruct
retrievePkChunkingResultIdsByBatchId(BulkConnection connection, String jobId,
String batchIdListStr) {
- Iterator<String> batchIds = Arrays.stream(batchIdListStr.split(",")).map(x
-> x.trim()).filter(x -> !x.equals("")).iterator();
+ Iterator<String> batchIds =
Arrays.stream(batchIdListStr.split(",")).map(String::trim).filter(x ->
!x.isEmpty()).iterator();
try {
List<BatchIdAndResultId> batchIdAndResultIdList =
fetchBatchResultIds(connection, jobId, batchIds);
return new ResultFileIdsStruct(jobId, batchIdAndResultIdList);
@@ -938,7 +902,7 @@ public class SalesforceExtractor extends RestApiExtractor {
throw new Exception("PK-Chunking query should have 1 and only 1
batch with state=NotProcessed.");
}
Stream<BatchInfo> stream = Arrays.stream(batchInfos);
- Iterator<String> batchIds = stream.filter(x ->
x.getNumberRecordsProcessed() != 0).map(x -> x.getId()).iterator();
+ Iterator<String> batchIds = stream.filter(x ->
x.getNumberRecordsProcessed() != 0).map(BatchInfo::getId).iterator();
List<BatchIdAndResultId> batchIdAndResultIdList =
fetchBatchResultIds(connection, jobId, batchIds);
return new ResultFileIdsStruct(jobId, batchIdAndResultIdList);
}
diff --git
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceHistogramService.java
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceHistogramService.java
index ec09da57a..17d277ba2 100644
---
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceHistogramService.java
+++
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceHistogramService.java
@@ -188,7 +188,7 @@ public class SalesforceHistogramService {
}
// make a copy of the histogram list and add a dummy entry at the end to
avoid special processing of the last group
- List<HistogramGroup> list = new ArrayList(histogram.getGroups());
+ List<HistogramGroup> list = new ArrayList<>(histogram.getGroups());
Date hwmDate = Utils.toDate(partition.getHighWatermark(),
Partitioner.WATERMARKTIMEFORMAT);
list.add(new HistogramGroup(Utils.epochToDate(hwmDate.getTime(),
SalesforceSource.SECONDS_FORMAT), 0));
@@ -285,12 +285,12 @@ public class SalesforceHistogramService {
int countLeft = getCountForRange(probingContext, sub, values, startEpoch,
midpointEpoch);
getHistogramRecursively(probingContext, histogram, sub, values, countLeft,
startEpoch, midpointEpoch);
- log.debug("Count {} for left partition {} to {}", countLeft, startEpoch,
midpointEpoch);
+ log.info("Count {} for left partition {} to {}", countLeft, startEpoch,
midpointEpoch);
int countRight = count - countLeft;
getHistogramRecursively(probingContext, histogram, sub, values,
countRight, midpointEpoch, endEpoch);
- log.debug("Count {} for right partition {} to {}", countRight,
midpointEpoch, endEpoch);
+ log.info("Count {} for right partition {} to {}", countRight,
midpointEpoch, endEpoch);
}
@@ -339,7 +339,7 @@ public class SalesforceHistogramService {
String query = sub.replace(PROBE_PARTITION_QUERY_TEMPLATE);
- log.debug("Count query: " + query);
+ log.info("Count query: {}", query);
probingContext.probeCount++;
JsonArray records = getRecordsForQuery(probingContext.connector, query);
diff --git
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
index aae7681e1..a7fd12b53 100644
---
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
+++
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
@@ -17,6 +17,18 @@
package org.apache.gobblin.salesforce;
+import java.io.IOException;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
@@ -29,17 +41,9 @@ import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-import java.io.IOException;
-import java.math.RoundingMode;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
+
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
@@ -64,7 +68,7 @@ import
org.apache.gobblin.source.extractor.watermark.WatermarkType;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.WorkUnit;
-import static org.apache.gobblin.configuration.ConfigurationKeys.*;
+import static
org.apache.gobblin.configuration.ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS;
import static org.apache.gobblin.salesforce.SalesforceConfigurationKeys.*;
/**
@@ -222,7 +226,7 @@ public class SalesforceSource extends
QueryBasedSource<JsonArray, JsonElement> {
/**
* Create work units by taking a bulkJobId.
- * The work units won't contain a query in this case. Instead they will
contain a BulkJobId and a list of `batchId:resultId`
+ * The work units won't contain a query in this case. Instead, they will
contain a BulkJobId and a list of `batchId:resultId`
* So in extractor, the work to do is just to fetch the resultSet files.
*/
private List<WorkUnit> createWorkUnits(
@@ -359,21 +363,24 @@ public class SalesforceSource extends
QueryBasedSource<JsonArray, JsonElement> {
DescriptiveStatistics statistics = new DescriptiveStatistics();
int count = 0;
- HistogramGroup group;
- Iterator<HistogramGroup> it = groups.iterator();
- while (it.hasNext()) {
- group = it.next();
+ /*
+ Using greedy algorithm by keep adding group until it exceeds the
interval size (x2)
+ Proof: Assuming nth group violates 2 x interval size, then all groups
from 0th to (n-1)th, plus nth group,
+ will have total size larger or equal to interval x 2. Hence, we are
saturating all intervals (with original size)
+ without leaving any unused space in between. We could choose x3,x4...
but it is not space efficient.
+ */
+ for (HistogramGroup group : groups) {
if (count == 0) {
// Add a new partition point;
partitionPoints.add(Utils.toDateTimeFormat(group.getKey(),
SECONDS_FORMAT, Partitioner.WATERMARKTIMEFORMAT));
}
- /**
- * Using greedy algorithm by keep adding group until it exceeds the
interval size (x2)
- * Proof: Assuming nth group violates 2 x interval size, then all groups
from 0th to (n-1)th, plus nth group,
- * will have total size larger or equal to interval x 2. Hence, we are
saturating all intervals (with original size)
- * without leaving any unused space in between. We could choose x3,x4...
but it is not space efficient.
+ /*
+ Using greedy algorithm by keep adding group until it exceeds the
interval size (x2)
+ Proof: Assuming nth group violates 2 x interval size, then all groups
from 0th to (n-1)th, plus nth group,
+ will have total size larger or equal to interval x 2. Hence, we are
saturating all intervals (with original size)
+ without leaving any unused space in between. We could choose x3,x4...
but it is not space efficient.
*/
if (count != 0 && count + group.getCount() >= 2 * interval) {
// Summarize current group