tpalfy commented on code in PR #6444:
URL: https://github.com/apache/nifi/pull/6444#discussion_r981142413
##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -330,76 +334,96 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
ageFilterUpper
);
+ AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
+
+ do {
+
FlowFile flowFile = session.create();
Map<String, String> originalAttributes = flowFile.getAttributes();
Map<String, String> attributes = new HashMap<>();
AtomicInteger recordCountHolder = new AtomicInteger();
- flowFile = session.write(flowFile, out -> {
- try (
- InputStream querySObjectResultInputStream =
salesforceRestService.query(querySObject);
- JsonTreeRowRecordReader jsonReader = new
JsonTreeRowRecordReader(
- querySObjectResultInputStream,
- getLogger(),
- convertedSalesforceSchema.recordSchema,
- DATE_FORMAT,
- TIME_FORMAT,
- DATE_TIME_FORMAT,
- StartingFieldStrategy.NESTED_FIELD,
- STARTING_FIELD_NAME,
- SchemaApplicationStrategy.SELECTED_PART
- );
-
- RecordSetWriter writer = writerFactory.createWriter(
- getLogger(),
- writerFactory.getSchema(
- originalAttributes,
- convertedSalesforceSchema.recordSchema
- ),
- out,
- originalAttributes
- )
- ) {
- writer.beginRecordSet();
-
- Record querySObjectRecord;
- while ((querySObjectRecord = jsonReader.nextRecord()) != null)
{
- writer.write(querySObjectRecord);
- }
-
- WriteResult writeResult = writer.finishRecordSet();
-
- attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
- attributes.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
- attributes.putAll(writeResult.getAttributes());
- recordCountHolder.set(writeResult.getRecordCount());
- if (ageFilterUpper != null) {
- Map<String, String> newState = new
HashMap<>(state.toMap());
- newState.put(LAST_AGE_FILTER, ageFilterUpper);
- updateState(context, newState);
+ flowFile = session.write(flowFile, out -> {
+ try (
+ InputStream querySObjectResultInputStream =
getResultInputStream(nextRecordsUrl, querySObject);
+
+ JsonTreeRowRecordReader jsonReader = new
JsonTreeRowRecordReader(
+ querySObjectResultInputStream,
+ getLogger(),
+ convertedSalesforceSchema.recordSchema,
+ DATE_FORMAT,
+ TIME_FORMAT,
+ DATE_TIME_FORMAT,
+ StartingFieldStrategy.NESTED_FIELD,
+ STARTING_FIELD_NAME,
+ SchemaApplicationStrategy.SELECTED_PART,
+ CAPTURE_PREDICATE
+ );
+
+ RecordSetWriter writer = writerFactory.createWriter(
+ getLogger(),
+ writerFactory.getSchema(
+ originalAttributes,
+ convertedSalesforceSchema.recordSchema
+ ),
+ out,
+ originalAttributes
+ )
+ ) {
+ writer.beginRecordSet();
+
+ Record querySObjectRecord;
+ while ((querySObjectRecord = jsonReader.nextRecord()) !=
null) {
+ writer.write(querySObjectRecord);
+ }
+
+ WriteResult writeResult = writer.finishRecordSet();
+
+ Map<String, String> storedFields =
jsonReader.getCapturedFields();
+
+ nextRecordsUrl.set(storedFields.getOrDefault(CURSOR_URL,
null));
Review Comment:
```suggestion
Map<String, String> capturedFields =
jsonReader.getCapturedFields();
nextRecordsUrl.set(capturedFields.getOrDefault(CURSOR_URL, null));
```
##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -330,76 +334,96 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
ageFilterUpper
);
+ AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
+
+ do {
+
FlowFile flowFile = session.create();
Map<String, String> originalAttributes = flowFile.getAttributes();
Map<String, String> attributes = new HashMap<>();
AtomicInteger recordCountHolder = new AtomicInteger();
- flowFile = session.write(flowFile, out -> {
- try (
- InputStream querySObjectResultInputStream =
salesforceRestService.query(querySObject);
- JsonTreeRowRecordReader jsonReader = new
JsonTreeRowRecordReader(
- querySObjectResultInputStream,
- getLogger(),
- convertedSalesforceSchema.recordSchema,
- DATE_FORMAT,
- TIME_FORMAT,
- DATE_TIME_FORMAT,
- StartingFieldStrategy.NESTED_FIELD,
- STARTING_FIELD_NAME,
- SchemaApplicationStrategy.SELECTED_PART
- );
-
- RecordSetWriter writer = writerFactory.createWriter(
- getLogger(),
- writerFactory.getSchema(
- originalAttributes,
- convertedSalesforceSchema.recordSchema
- ),
- out,
- originalAttributes
- )
- ) {
- writer.beginRecordSet();
-
- Record querySObjectRecord;
- while ((querySObjectRecord = jsonReader.nextRecord()) != null)
{
- writer.write(querySObjectRecord);
- }
-
- WriteResult writeResult = writer.finishRecordSet();
-
- attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
- attributes.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
- attributes.putAll(writeResult.getAttributes());
- recordCountHolder.set(writeResult.getRecordCount());
- if (ageFilterUpper != null) {
- Map<String, String> newState = new
HashMap<>(state.toMap());
- newState.put(LAST_AGE_FILTER, ageFilterUpper);
- updateState(context, newState);
+ flowFile = session.write(flowFile, out -> {
+ try (
+ InputStream querySObjectResultInputStream =
getResultInputStream(nextRecordsUrl, querySObject);
+
+ JsonTreeRowRecordReader jsonReader = new
JsonTreeRowRecordReader(
+ querySObjectResultInputStream,
+ getLogger(),
+ convertedSalesforceSchema.recordSchema,
+ DATE_FORMAT,
+ TIME_FORMAT,
+ DATE_TIME_FORMAT,
+ StartingFieldStrategy.NESTED_FIELD,
+ STARTING_FIELD_NAME,
+ SchemaApplicationStrategy.SELECTED_PART,
+ CAPTURE_PREDICATE
+ );
+
+ RecordSetWriter writer = writerFactory.createWriter(
+ getLogger(),
+ writerFactory.getSchema(
+ originalAttributes,
+ convertedSalesforceSchema.recordSchema
+ ),
+ out,
+ originalAttributes
+ )
+ ) {
+ writer.beginRecordSet();
+
+ Record querySObjectRecord;
+ while ((querySObjectRecord = jsonReader.nextRecord()) !=
null) {
+ writer.write(querySObjectRecord);
+ }
+
+ WriteResult writeResult = writer.finishRecordSet();
+
+ Map<String, String> storedFields =
jsonReader.getCapturedFields();
+
+ nextRecordsUrl.set(storedFields.getOrDefault(CURSOR_URL,
null));
+
+ attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
+
+ recordCountHolder.set(writeResult.getRecordCount());
+
+ if (ageFilterUpper != null) {
+ Map<String, String> newState = new
HashMap<>(state.toMap());
+ newState.put(LAST_AGE_FILTER, ageFilterUpper);
+ updateState(context, newState);
+ }
+ } catch (SchemaNotFoundException e) {
+ throw new ProcessException("Couldn't create record
writer", e);
+ } catch (MalformedRecordException e) {
+ throw new ProcessException("Couldn't read records from
input", e);
}
- } catch (SchemaNotFoundException e) {
- throw new ProcessException("Couldn't create record writer", e);
- } catch (MalformedRecordException e) {
- throw new ProcessException("Couldn't read records from input",
e);
- }
- });
+ });
- int recordCount = recordCountHolder.get();
+ int recordCount = recordCountHolder.get();
- if (!createZeroRecordFlowFiles && recordCount == 0) {
- session.remove(flowFile);
- } else {
- flowFile = session.putAllAttributes(flowFile, attributes);
- session.transfer(flowFile, REL_SUCCESS);
+ if (!createZeroRecordFlowFiles && recordCount == 0) {
+ session.remove(flowFile);
+ } else {
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ session.transfer(flowFile, REL_SUCCESS);
+
+ session.adjustCounter("Records Processed", recordCount, false);
+ getLogger().info("Successfully written {} records for {}",
recordCount, flowFile);
+ }
+ } while (nextRecordsUrl.get() != null);
+ }
- session.adjustCounter("Records Processed", recordCount, false);
- getLogger().info("Successfully written {} records for {}",
recordCount, flowFile);
+ private InputStream getResultInputStream(AtomicReference<String>
nextRecordsUrl, String querySObject) {
+ if (nextRecordsUrl.get() == null) {
+ return salesforceRestService.query(querySObject);
}
+ return salesforceRestService.queryNextRecords(nextRecordsUrl.get());
Review Comment:
```suggestion
return salesforceRestService.getNextRecords(nextRecordsUrl.get());
```
##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -220,6 +222,8 @@ public class QuerySalesforceObject extends
AbstractProcessor {
private static final String DATE_FORMAT = "yyyy-MM-dd";
private static final String TIME_FORMAT = "HH:mm:ss.SSSX";
private static final String DATE_TIME_FORMAT =
"yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ";
+ private static final String CURSOR_URL = "nextRecordsUrl";
Review Comment:
```suggestion
private static final String NEXT_RECORDS_URL = "nextRecordsUrl";
```
##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java:
##########
@@ -69,6 +69,21 @@ public InputStream query(String query) {
return request(request);
}
+ public InputStream queryNextRecords(String uri) {
+ String url = baseUrl + uri;
Review Comment:
```suggestion
public InputStream getNextRecords(String nextRecordsUrl) {
String url = baseUrl + nextRecordsUrl;
```
##########
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##########
@@ -48,9 +47,12 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.function.BiPredicate;
import java.util.function.Supplier;
public abstract class AbstractJsonRowRecordReader implements RecordReader {
+ private static final String ARRAY_OR_OBJECT_TYPE = "array or object type
field";
+
Review Comment:
```suggestion
```
##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -330,76 +334,96 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
ageFilterUpper
);
+ AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
+
+ do {
+
FlowFile flowFile = session.create();
Map<String, String> originalAttributes = flowFile.getAttributes();
Map<String, String> attributes = new HashMap<>();
AtomicInteger recordCountHolder = new AtomicInteger();
Review Comment:
```suggestion
FlowFile flowFile = session.create();
Map<String, String> originalAttributes =
flowFile.getAttributes();
Map<String, String> attributes = new HashMap<>();
AtomicInteger recordCountHolder = new AtomicInteger();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]