This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a1fc4603 [GOBBLIN-1917] Logging updates for Salesforce classes (#3786)
2a1fc4603 is described below

commit 2a1fc46038e75eefc27f46f30d52d2fd7b1e2d9d
Author: Gautam Kumar <[email protected]>
AuthorDate: Fri Sep 22 08:45:25 2023 +0530

    [GOBBLIN-1917] Logging updates for Salesforce classes (#3786)
    
    * Logging updates for Salesforce classes
    
    - Updating the log messages in the Salesforce related classes.
    - In some cases, the exception was not included in the log. Adding it to 
make debugging issues easier.
    - For some cases, added more details in the logs to print the current 
progress of the tasks.
    - Minor coding style updates as well to improve code readability.
    - There are no functional changes in this PR
    
    * Fixed logging of jobTaskMetrics
    
    * Revert logging on cancel in GobblinHelixJobTask class
---
 .../gobblin/cluster/GobblinHelixJobTask.java       |   7 +-
 .../apache/gobblin/runtime/SafeDatasetCommit.java  |   2 +-
 .../gobblin/salesforce/BulkResultIterator.java     |  18 +--
 .../gobblin/salesforce/SalesforceExtractor.java    | 122 ++++++++-------------
 .../salesforce/SalesforceHistogramService.java     |   8 +-
 .../gobblin/salesforce/SalesforceSource.java       |  49 +++++----
 6 files changed, 89 insertions(+), 117 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
index 218d5834c..d463ddad9 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
@@ -174,7 +174,7 @@ class GobblinHelixJobTask implements Task {
             try {
               HelixUtils.deleteWorkflow(previousActualJobId, 
this.jobHelixManager, timeOut);
             } catch (HelixException e) {
-              log.error("Helix cannot delete previous actual job id {} within 
{} seconds.", previousActualJobId, timeOut / 1000);
+              log.error("Helix cannot delete previous actual job id {} within 
{} seconds.", previousActualJobId, timeOut / 1000, e);
               return new TaskResult(TaskResult.Status.FAILED, 
ExceptionUtils.getFullStackTrace(e));
             }
           }
@@ -202,7 +202,7 @@ class GobblinHelixJobTask implements Task {
       log.info("Completing planning job {}", this.planningJobId);
       return new TaskResult(TaskResult.Status.COMPLETED, "");
     } catch (Exception e) {
-      log.info("Failing planning job {}", this.planningJobId);
+      log.warn("Failing planning job {}", this.planningJobId, e);
       return new TaskResult(TaskResult.Status.FAILED, "Exception occurred for 
job " + planningJobId + ":" + ExceptionUtils
           .getFullStackTrace(e));
     } finally {
@@ -211,6 +211,7 @@ class GobblinHelixJobTask implements Task {
       try {
         this.jobsMapping.deleteMapping(jobUri);
       } catch (Exception e) {
+        log.warn("Failed to delete jobs mapping for job: {}", jobUri, e);
         return new TaskResult(TaskResult.Status.FAILED,"Cannot delete jobs 
mapping for job : " + jobUri);
       }
     }
@@ -230,7 +231,7 @@ class GobblinHelixJobTask implements Task {
         try {
           this.jobsMapping.deleteMapping(jobUri);
         } catch (Exception e) {
-          throw new RuntimeException("Cannot delete jobs mapping for job : " + 
jobUri);
+          throw new RuntimeException("Cannot delete jobs mapping for job : " + 
jobUri, e);
         }
       }
     }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index ffd2683a2..55c9ebd76 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -104,7 +104,7 @@ final class SafeDatasetCommit implements Callable<Void> {
                 this.jobContext.getJobId(), 
this.jobContext.getJobCommitPolicy(), this.datasetState.getState()));
       }
     } catch (ReflectiveOperationException roe) {
-      log.error("Failed to instantiate data publisher for dataset %s of job 
%s.", this.datasetUrn,
+      log.error("Failed to instantiate data publisher for dataset {} of job 
{}.", this.datasetUrn,
           this.jobContext.getJobId(), roe);
       throw new RuntimeException(roe);
     } finally {
diff --git 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
index 0f0aac373..06bfd7043 100644
--- 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
+++ 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
@@ -38,19 +38,19 @@ import org.apache.gobblin.source.extractor.utils.Utils;
  */
 @Slf4j
 public class BulkResultIterator implements Iterator<JsonElement> {
-  private FileIdVO fileIdVO;
-  private int retryLimit;
-  private BulkConnection conn;
+  private final FileIdVO fileIdVO;
+  private final int retryLimit;
+  private final BulkConnection conn;
   private InputStreamCSVReader csvReader;
   private List<String> header;
   private int columnSize;
   private int lineCount = 0; // this is different than currentFileRowCount. 
cvs file has header
-  private long retryInterval;
-  private long retryExceedQuotaInterval;
+  private final long retryInterval;
+  private final long retryExceedQuotaInterval;
   private List<String> preLoadedLine = null;
 
   public BulkResultIterator(BulkConnection conn, FileIdVO fileIdVO, int 
retryLimit, long retryInterval, long retryExceedQuotaInterval) {
-    log.info("create BulkResultIterator: " + fileIdVO);
+    log.info("create BulkResultIterator: {} with retry limit as {} and 
retryInterval as {}", fileIdVO, retryLimit, retryInterval);
     this.retryInterval = retryInterval;
     this.retryExceedQuotaInterval = retryExceedQuotaInterval;
     this.conn = conn;
@@ -87,17 +87,17 @@ public class BulkResultIterator implements 
Iterator<JsonElement> {
         // Each organization is allowed 10 concurrent long-running requests. 
If the limit is reached,
         // any new synchronous Apex request results in a runtime exception.
         if (e.isCurrentExceptionExceedQuota()) {
-          log.warn("--Caught ExceededQuota: " + e.getMessage());
+          log.warn("--Caught ExceededQuota: ", e);
           threadSleep(retryExceedQuotaInterval);
           executeCount--; // if the current exception is Quota Exceeded, keep 
trying forever
         }
-        log.info("***Retrying***1: {} - {}", fileIdVO, e.getMessage());
+        log.info("***Retrying***1: {} - Attempt {}/{}", fileIdVO, executeCount 
+ 1, retryLimit, e);
         this.csvReader = null; // in next loop, call openAndSeekCsvReader
       } catch (Exception e) {
         // Retry may resolve other exceptions.
         rootCause = e;
         threadSleep(retryInterval);
-        log.info("***Retrying***2: {} - {}", fileIdVO, e.getMessage());
+        log.info("***Retrying***2: {} - Attempt {}/{}", fileIdVO, executeCount 
+ 1, retryLimit, e);
         this.csvReader = null; // in next loop, call openAndSeekCsvReader
       }
     }
diff --git 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index 50935ed60..c9a041971 100644
--- 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++ 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -24,10 +24,10 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -55,6 +55,7 @@ import com.sforce.async.BulkConnection;
 import com.sforce.async.ConcurrencyMode;
 import com.sforce.async.ContentType;
 import com.sforce.async.JobInfo;
+import com.sforce.async.JobStateEnum;
 import com.sforce.async.OperationEnum;
 import com.sforce.async.QueryResultList;
 import com.sforce.soap.partner.PartnerConnection;
@@ -115,9 +116,6 @@ public class SalesforceExtractor extends RestApiExtractor {
 
   private BulkConnection bulkConnection = null;
   private JobInfo bulkJob = new JobInfo();
-  private List<BatchIdAndResultId> bulkResultIdList;
-  private boolean bulkJobFinished = true;
-  private boolean newBulkResultSet = true;
 
   private final int pkChunkingSize;
   private final SalesforceConnector sfConnector;
@@ -127,12 +125,11 @@ public class SalesforceExtractor extends RestApiExtractor 
{
   private final long retryExceedQuotaInterval;
 
   private final boolean bulkApiUseQueryAll;
-  private SfConfig conf;
-
+  private boolean isPkChunkingFetchDone = false;
 
   public SalesforceExtractor(WorkUnitState state) {
     super(state);
-    conf = new SfConfig(state.getProperties());
+    SfConfig conf = new SfConfig(state.getProperties());
 
     this.sfConnector = (SalesforceConnector) this.connector;
     this.pkChunkingSize = conf.pkChunkingSize;
@@ -162,22 +159,6 @@ public class SalesforceExtractor extends RestApiExtractor {
     this.nextUrl = nextUrl;
   }
 
-  private boolean isBulkJobFinished() {
-    return this.bulkJobFinished;
-  }
-
-  private void setBulkJobFinished(boolean bulkJobFinished) {
-    this.bulkJobFinished = bulkJobFinished;
-  }
-
-  private boolean isNewBulkResultSet() {
-    return this.newBulkResultSet;
-  }
-
-  private void setNewBulkResultSet(boolean newBulkResultSet) {
-    this.newBulkResultSet = newBulkResultSet;
-  }
-
   @Override
   public HttpEntity getAuthentication() throws RestApiConnectionException {
     log.debug("Authenticating salesforce");
@@ -185,7 +166,7 @@ public class SalesforceExtractor extends RestApiExtractor {
   }
 
   @Override
-  public List<Command> getSchemaMetadata(String schema, String entity) throws 
SchemaException {
+  public List<Command> getSchemaMetadata(String schema, String entity) {
     log.debug("Build url to retrieve schema");
     return constructGetCommand(this.sfConnector.getFullUri("/sobjects/" + 
entity.trim() + "/describe"));
   }
@@ -265,12 +246,12 @@ public class SalesforceExtractor extends RestApiExtractor 
{
     for (Predicate predicate : predicateList) {
       query = SqlQueryUtils.addPredicate(query, predicate.getCondition());
     }
-    log.info("getHighWatermarkMetadata - QUERY: " + query);
+    log.info("getHighWatermarkMetadata - QUERY: {}", query);
 
     try {
       return 
constructGetCommand(this.sfConnector.getFullUri(getSoqlUrl(query)));
     } catch (Exception e) {
-      throw new HighWatermarkException("Failed to get salesforce url for high 
watermark; error - " + e.getMessage(), e);
+      throw new HighWatermarkException("Failed to get salesforce url for high 
watermark", e);
     }
   }
 
@@ -401,13 +382,11 @@ public class SalesforceExtractor extends RestApiExtractor 
{
         String limitString = getLimitFromInputQuery(query);
         query = query.replace(limitString, "");
 
-        Iterator<Predicate> i = predicateList.listIterator();
-        while (i.hasNext()) {
-          Predicate predicate = i.next();
+        for (Predicate predicate : predicateList) {
           query = SqlQueryUtils.addPredicate(query, predicate.getCondition());
         }
 
-        if 
(Boolean.valueOf(this.workUnitState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_IS_SPECIFIC_API_ACTIVE)))
 {
+        if 
(Boolean.parseBoolean(this.workUnitState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_IS_SPECIFIC_API_ACTIVE)))
 {
           query = SqlQueryUtils.addPredicate(query, "IsDeleted = true");
         }
 
@@ -442,7 +421,7 @@ public class SalesforceExtractor extends RestApiExtractor {
       throw new DataRecordException("Failed to get data from salesforce; REST 
response has no output");
     }
 
-    List<JsonElement> rs = Lists.newArrayList();
+    List<JsonElement> records = Lists.newArrayList();
     JsonElement element = GSON.fromJson(output, JsonObject.class);
     JsonArray partRecords;
     try {
@@ -457,14 +436,12 @@ public class SalesforceExtractor extends RestApiExtractor 
{
       }
 
       JsonArray array = Utils.removeElementFromJsonArray(partRecords, 
"attributes");
-      Iterator<JsonElement> li = array.iterator();
-      while (li.hasNext()) {
-        JsonElement recordElement = li.next();
-        rs.add(recordElement);
+      for (JsonElement recordElement : array) {
+        records.add(recordElement);
       }
-      return rs.iterator();
+      return records.iterator();
     } catch (Exception e) {
-      throw new DataRecordException("Failed to get records from salesforce; 
error - " + e.getMessage(), e);
+      throw new DataRecordException("Failed to get records from salesforce", 
e);
     }
   }
 
@@ -489,9 +466,7 @@ public class SalesforceExtractor extends RestApiExtractor {
   private static String buildUrl(String path, List<NameValuePair> qparams) 
throws RestApiClientException {
     URIBuilder builder = new URIBuilder();
     builder.setPath(path);
-    ListIterator<NameValuePair> i = qparams.listIterator();
-    while (i.hasNext()) {
-      NameValuePair keyValue = i.next();
+    for (NameValuePair keyValue : qparams) {
       builder.setParameter(keyValue.getName(), keyValue.getValue());
     }
     URI uri;
@@ -504,10 +479,7 @@ public class SalesforceExtractor extends RestApiExtractor {
   }
 
   private static boolean isNullPredicate(List<Predicate> predicateList) {
-    if (predicateList == null || predicateList.size() == 0) {
-      return true;
-    }
-    return false;
+    return predicateList == null || predicateList.isEmpty();
   }
 
   @Override
@@ -525,27 +497,27 @@ public class SalesforceExtractor extends RestApiExtractor 
{
   @Override
   public String getHourPredicateCondition(String column, long value, String 
valueFormat, String operator) {
     log.info("Getting hour predicate from salesforce");
-    String formattedvalue = Utils.toDateTimeFormat(Long.toString(value), 
valueFormat, SALESFORCE_HOUR_FORMAT);
-    return column + " " + operator + " " + formattedvalue;
+    String formattedValue = Utils.toDateTimeFormat(Long.toString(value), 
valueFormat, SALESFORCE_HOUR_FORMAT);
+    return column + " " + operator + " " + formattedValue;
   }
 
   @Override
   public String getDatePredicateCondition(String column, long value, String 
valueFormat, String operator) {
     log.info("Getting date predicate from salesforce");
-    String formattedvalue = Utils.toDateTimeFormat(Long.toString(value), 
valueFormat, SALESFORCE_DATE_FORMAT);
-    return column + " " + operator + " " + formattedvalue;
+    String formattedValue = Utils.toDateTimeFormat(Long.toString(value), 
valueFormat, SALESFORCE_DATE_FORMAT);
+    return column + " " + operator + " " + formattedValue;
   }
 
   @Override
   public String getTimestampPredicateCondition(String column, long value, 
String valueFormat, String operator) {
     log.info("Getting timestamp predicate from salesforce");
-    String formattedvalue = Utils.toDateTimeFormat(Long.toString(value), 
valueFormat, SALESFORCE_TIMESTAMP_FORMAT);
-    return column + " " + operator + " " + formattedvalue;
+    String formattedValue = Utils.toDateTimeFormat(Long.toString(value), 
valueFormat, SALESFORCE_TIMESTAMP_FORMAT);
+    return column + " " + operator + " " + formattedValue;
   }
 
   @Override
   public Map<String, String> getDataTypeMap() {
-    Map<String, String> dataTypeMap = ImmutableMap.<String, 
String>builder().put("url", "string")
+    return ImmutableMap.<String, String>builder().put("url", "string")
         .put("textarea", "string").put("reference", "string").put("phone", 
"string").put("masterrecord", "string")
         .put("location", "string").put("id", "string").put("encryptedstring", 
"string").put("email", "string")
         .put("DataCategoryGroupReference", "string").put("calculated", 
"string").put("anyType", "string")
@@ -554,11 +526,8 @@ public class SalesforceExtractor extends RestApiExtractor {
         .put("double", "double").put("percent", "double").put("currency", 
"double").put("decimal", "double")
         .put("boolean", "boolean").put("picklist", 
"string").put("multipicklist", "string").put("combobox", "string")
         .put("list", "string").put("set", "string").put("map", 
"string").put("enum", "string").build();
-    return dataTypeMap;
   }
 
-  private Boolean isPkChunkingFetchDone = false;
-
   private Iterator<JsonElement> fetchRecordSetPkChunking(WorkUnit workUnit) {
     if (isPkChunkingFetchDone) {
       return null; // must return null to represent no more data.
@@ -592,19 +561,17 @@ public class SalesforceExtractor extends RestApiExtractor 
{
     isBulkFetchDone = true;
     log.info("----Get records for bulk batch job----");
     try {
-      // set finish status to false before starting the bulk job
-      this.setBulkJobFinished(false);
-      this.bulkResultIdList = getQueryResultIds(entity, predicateList);
-      log.info("Number of bulk api resultSet Ids:" + 
this.bulkResultIdList.size());
-      List<FileIdVO> fileIdVoList = this.bulkResultIdList.stream()
-          .map(x -> new FileIdVO(this.bulkJob.getId(), x.batchId, x.resultId))
+      List<BatchIdAndResultId> batchIdAndResultIds = getQueryResultIds(entity, 
predicateList);
+      log.info("Number of bulk api resultSet Ids:" + 
batchIdAndResultIds.size());
+      List<FileIdVO> fileIdVoList = batchIdAndResultIds.stream()
+          .map(batchIdAndResultId -> new FileIdVO(this.bulkJob.getId(), 
batchIdAndResultId.batchId, batchIdAndResultId.resultId))
           .collect(Collectors.toList());
       ResultChainingIterator chainingIter = new ResultChainingIterator(
           bulkConnection, fileIdVoList, retryLimit, retryInterval, 
retryExceedQuotaInterval);
       chainingIter.add(getSoftDeletedRecords(schema, entity, workUnit, 
predicateList));
       return chainingIter;
     } catch (Exception e) {
-      throw new RuntimeException("Failed to get records using bulk api; error 
- " + e.getMessage(), e);
+      throw new RuntimeException("Failed to get records using bulk api", e);
     }
   }
 
@@ -764,9 +731,7 @@ public class SalesforceExtractor extends RestApiExtractor {
       if (!isNullPredicate(predicateList)) {
         String limitString = getLimitFromInputQuery(query);
         query = query.replace(limitString, "");
-        Iterator<Predicate> i = predicateList.listIterator();
-        while (i.hasNext()) {
-          Predicate predicate = i.next();
+        for (Predicate predicate : predicateList) {
           query = SqlQueryUtils.addPredicate(query, predicate.getCondition());
         }
         query = query + limitString;
@@ -797,8 +762,7 @@ public class SalesforceExtractor extends RestApiExtractor {
         log.error("Bulk batch failed: " + batchResponse.toString());
         throw new Exception("Failed to get bulk batch info for jobId " + jobId 
+ " error - " + batchResponse.getStateMessage());
       }
-      ResultFileIdsStruct resultFileIdsStruct = 
retrievePkChunkingResultIds(connection, jobId, waitMilliSecond);
-      return resultFileIdsStruct;
+      return retrievePkChunkingResultIds(connection, jobId, waitMilliSecond);
     } catch (Exception e) {
       throw new RuntimeException("getQueryResultIdsPkChunking: error - " + 
e.getMessage(), e);
     }
@@ -810,7 +774,7 @@ public class SalesforceExtractor extends RestApiExtractor {
    * @param predicateList of all predicate conditions
    * @return iterator with batch of records
    */
-  private List<BatchIdAndResultId> getQueryResultIds(String entity, 
List<Predicate> predicateList) throws Exception {
+  private List<BatchIdAndResultId> getQueryResultIds(String entity, 
List<Predicate> predicateList) {
     bulkApiLogin();
     try {
       // Set bulk job attributes
@@ -832,16 +796,14 @@ public class SalesforceExtractor extends RestApiExtractor 
{
         String limitString = getLimitFromInputQuery(query);
         query = query.replace(limitString, "");
 
-        Iterator<Predicate> i = predicateList.listIterator();
-        while (i.hasNext()) {
-          Predicate predicate = i.next();
+        for (Predicate predicate : predicateList) {
           query = SqlQueryUtils.addPredicate(query, predicate.getCondition());
         }
 
         query = query + limitString;
       }
 
-      log.info("getQueryResultIds - QUERY:" + query);
+      log.info("getQueryResultIds - QUERY: {}", query);
       ByteArrayInputStream bout = new 
ByteArrayInputStream(query.getBytes(ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
 
       BatchInfo bulkBatchInfo = 
this.bulkConnection.createBatchFromStream(this.bulkJob, bout);
@@ -859,7 +821,10 @@ public class SalesforceExtractor extends RestApiExtractor {
           
ConfigurationKeys.EXTRACT_SALESFORCE_BULK_API_MAX_WAIT_TIME_IN_MILLIS_KEY,
           
ConfigurationKeys.DEFAULT_EXTRACT_SALESFORCE_BULK_API_MAX_WAIT_TIME_IN_MILLIS);
       while (bulkBatchInfo.getState() == BatchStateEnum.InProgress || 
bulkBatchInfo.getState() == BatchStateEnum.Queued) {
-        log.info("Waiting for bulk resultSetIds");
+        log.info("Waiting for bulk resultSetIds | Job ID: {} | State: {} | 
State message: {} |"
+                + " Num records processed: {} | Num records failed: {}",
+            bulkBatchInfo.getJobId(), bulkBatchInfo.getState(), 
bulkBatchInfo.getStateMessage(),
+            bulkBatchInfo.getNumberRecordsProcessed(), 
bulkBatchInfo.getNumberRecordsFailed());
         // Exponential backoff
         long waitMilliSeconds = Math.min((long) (Math.pow(2, count) * 
minWaitTimeInMilliSeconds), maxWaitTimeInMilliSeconds);
         Thread.sleep(waitMilliSeconds);
@@ -873,7 +838,7 @@ public class SalesforceExtractor extends RestApiExtractor {
       // not sure if the state can be "NotProcessed" in non-pk-chunking bulk 
request
       // SFDC doc says - This state is assigned when a job is aborted while 
the batch is queued
       if (state == BatchStateEnum.Failed || state == 
BatchStateEnum.InProgress) {
-        log.error("Bulk batch failed: " + bulkBatchInfo.toString());
+        log.error("Bulk batch failed: {}", bulkBatchInfo);
         throw new RuntimeException("Failed to get bulk batch info for jobId " 
+ bulkBatchInfo.getJobId()
             + " error - " + bulkBatchInfo.getStateMessage());
       }
@@ -893,15 +858,14 @@ public class SalesforceExtractor extends RestApiExtractor 
{
       return batchIdAndResultIdList;
 
     } catch (RuntimeException | AsyncApiException | InterruptedException e) {
-      throw new RuntimeException(
-          "Failed to get query result ids from salesforce using bulk api; 
error - " + e.getMessage(), e);
+      throw new RuntimeException("Failed to get query result ids from 
salesforce using bulk api", e);
     }
   }
 
   @Override
   public void closeConnection() throws Exception {
     if (this.bulkConnection != null
-        && 
!this.bulkConnection.getJobStatus(this.getBulkJobId()).getState().toString().equals("Closed"))
 {
+        && 
!JobStateEnum.Closed.equals(this.bulkConnection.getJobStatus(this.getBulkJobId()).getState()))
 {
       log.info("Closing salesforce bulk job connection");
       this.bulkConnection.closeJob(this.getBulkJobId());
     }
@@ -909,11 +873,11 @@ public class SalesforceExtractor extends RestApiExtractor 
{
   }
 
   private static List<Command> constructGetCommand(String restQuery) {
-    return Arrays.asList(new RestApiCommand().build(Arrays.asList(restQuery), 
RestApiCommandType.GET));
+    return Collections.singletonList(new 
RestApiCommand().build(Collections.singletonList(restQuery), 
RestApiCommandType.GET));
   }
 
   private ResultFileIdsStruct 
retrievePkChunkingResultIdsByBatchId(BulkConnection connection, String jobId, 
String batchIdListStr) {
-    Iterator<String> batchIds = Arrays.stream(batchIdListStr.split(",")).map(x 
-> x.trim()).filter(x -> !x.equals("")).iterator();
+    Iterator<String> batchIds = 
Arrays.stream(batchIdListStr.split(",")).map(String::trim).filter(x -> 
!x.isEmpty()).iterator();
     try {
       List<BatchIdAndResultId> batchIdAndResultIdList = 
fetchBatchResultIds(connection, jobId, batchIds);
       return new ResultFileIdsStruct(jobId, batchIdAndResultIdList);
@@ -938,7 +902,7 @@ public class SalesforceExtractor extends RestApiExtractor {
           throw new Exception("PK-Chunking query should have 1 and only 1 
batch with state=NotProcessed.");
         }
         Stream<BatchInfo> stream = Arrays.stream(batchInfos);
-        Iterator<String> batchIds = stream.filter(x -> 
x.getNumberRecordsProcessed() != 0).map(x -> x.getId()).iterator();
+        Iterator<String> batchIds = stream.filter(x -> 
x.getNumberRecordsProcessed() != 0).map(BatchInfo::getId).iterator();
         List<BatchIdAndResultId> batchIdAndResultIdList = 
fetchBatchResultIds(connection, jobId, batchIds);
         return new ResultFileIdsStruct(jobId, batchIdAndResultIdList);
       }
diff --git 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceHistogramService.java
 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceHistogramService.java
index ec09da57a..17d277ba2 100644
--- 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceHistogramService.java
+++ 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceHistogramService.java
@@ -188,7 +188,7 @@ public class SalesforceHistogramService {
     }
 
     // make a copy of the histogram list and add a dummy entry at the end to 
avoid special processing of the last group
-    List<HistogramGroup> list = new ArrayList(histogram.getGroups());
+    List<HistogramGroup> list = new ArrayList<>(histogram.getGroups());
     Date hwmDate = Utils.toDate(partition.getHighWatermark(), 
Partitioner.WATERMARKTIMEFORMAT);
     list.add(new HistogramGroup(Utils.epochToDate(hwmDate.getTime(), 
SalesforceSource.SECONDS_FORMAT), 0));
 
@@ -285,12 +285,12 @@ public class SalesforceHistogramService {
     int countLeft = getCountForRange(probingContext, sub, values, startEpoch, 
midpointEpoch);
 
     getHistogramRecursively(probingContext, histogram, sub, values, countLeft, 
startEpoch, midpointEpoch);
-    log.debug("Count {} for left partition {} to {}", countLeft, startEpoch, 
midpointEpoch);
+    log.info("Count {} for left partition {} to {}", countLeft, startEpoch, 
midpointEpoch);
 
     int countRight = count - countLeft;
 
     getHistogramRecursively(probingContext, histogram, sub, values, 
countRight, midpointEpoch, endEpoch);
-    log.debug("Count {} for right partition {} to {}", countRight, 
midpointEpoch, endEpoch);
+    log.info("Count {} for right partition {} to {}", countRight, 
midpointEpoch, endEpoch);
   }
 
 
@@ -339,7 +339,7 @@ public class SalesforceHistogramService {
 
     String query = sub.replace(PROBE_PARTITION_QUERY_TEMPLATE);
 
-    log.debug("Count query: " + query);
+    log.info("Count query: {}", query);
     probingContext.probeCount++;
 
     JsonArray records = getRecordsForQuery(probingContext.connector, query);
diff --git 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
index aae7681e1..a7fd12b53 100644
--- 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
+++ 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
@@ -17,6 +17,18 @@
 
 package org.apache.gobblin.salesforce;
 
+import java.io.IOException;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
@@ -29,17 +41,9 @@ import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
-import java.io.IOException;
-import java.math.RoundingMode;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
+
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
@@ -64,7 +68,7 @@ import 
org.apache.gobblin.source.extractor.watermark.WatermarkType;
 import org.apache.gobblin.source.workunit.Extract;
 import org.apache.gobblin.source.workunit.WorkUnit;
 
-import static org.apache.gobblin.configuration.ConfigurationKeys.*;
+import static 
org.apache.gobblin.configuration.ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS;
 import static org.apache.gobblin.salesforce.SalesforceConfigurationKeys.*;
 
 /**
@@ -222,7 +226,7 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
 
   /**
    *  Create work units by taking a bulkJobId.
-   *  The work units won't contain a query in this case. Instead they will 
contain a BulkJobId and a list of `batchId:resultId`
+   *  The work units won't contain a query in this case. Instead, they will 
contain a BulkJobId and a list of `batchId:resultId`
    *  So in extractor, the work to do is just to fetch the resultSet files.
    */
   private List<WorkUnit> createWorkUnits(
@@ -359,21 +363,24 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
     DescriptiveStatistics statistics = new DescriptiveStatistics();
 
     int count = 0;
-    HistogramGroup group;
-    Iterator<HistogramGroup> it = groups.iterator();
 
-    while (it.hasNext()) {
-      group = it.next();
+    /*
+      Using greedy algorithm by keep adding group until it exceeds the 
interval size (x2)
+      Proof: Assuming nth group violates 2 x interval size, then all groups 
from 0th to (n-1)th, plus nth group,
+      will have total size larger or equal to interval x 2. Hence, we are 
saturating all intervals (with original size)
+      without leaving any unused space in between. We could choose x3,x4... 
but it is not space efficient.
+     */
+    for (HistogramGroup group : groups) {
       if (count == 0) {
         // Add a new partition point;
         partitionPoints.add(Utils.toDateTimeFormat(group.getKey(), 
SECONDS_FORMAT, Partitioner.WATERMARKTIMEFORMAT));
       }
 
-      /**
-       * Using greedy algorithm by keep adding group until it exceeds the 
interval size (x2)
-       * Proof: Assuming nth group violates 2 x interval size, then all groups 
from 0th to (n-1)th, plus nth group,
-       * will have total size larger or equal to interval x 2. Hence, we are 
saturating all intervals (with original size)
-       * without leaving any unused space in between. We could choose x3,x4... 
but it is not space efficient.
+      /*
+        Using greedy algorithm by keep adding group until it exceeds the 
interval size (x2)
+        Proof: Assuming nth group violates 2 x interval size, then all groups 
from 0th to (n-1)th, plus nth group,
+        will have total size larger or equal to interval x 2. Hence, we are 
saturating all intervals (with original size)
+        without leaving any unused space in between. We could choose x3,x4... 
but it is not space efficient.
        */
       if (count != 0 && count + group.getCount() >= 2 * interval) {
         // Summarize current group

Reply via email to