This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 63aac1a31d NIFI-10513: Added capture non-record fields to 
JsonTreeRowRecordReader, added pagination to QuerySalesforceObject
63aac1a31d is described below

commit 63aac1a31d5d35fb133d5768abf99201964a16b4
Author: Lehel Boér <[email protected]>
AuthorDate: Fri Sep 23 05:10:12 2022 +0200

    NIFI-10513: Added capture non-record fields to JsonTreeRowRecordReader, 
added pagination to QuerySalesforceObject
    
    This closes #6444.
    
    Signed-off-by: Tamas Palfy <[email protected]>
---
 .../nifi/json/AbstractJsonRowRecordReader.java     |  77 +++++++++--
 .../apache/nifi/json/JsonTreeRowRecordReader.java  |  11 +-
 .../salesforce/QuerySalesforceObject.java          | 147 ++++++++++++---------
 .../salesforce/util/SalesforceRestService.java     |  15 +++
 .../java/org/apache/nifi/json/JsonTreeReader.java  |   4 +-
 .../nifi/json/TestJsonTreeRowRecordReader.java     |  87 +++++++++---
 .../src/test/resources/json/capture-fields.json    |  20 +++
 7 files changed, 268 insertions(+), 93 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
index a3515d03db..b024beed6c 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.core.io.SerializedString;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -48,9 +47,11 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.function.BiPredicate;
 import java.util.function.Supplier;
 
 public abstract class AbstractJsonRowRecordReader implements RecordReader {
+
     private final ComponentLog logger;
     private final Supplier<DateFormat> LAZY_DATE_FORMAT;
     private final Supplier<DateFormat> LAZY_TIME_FORMAT;
@@ -64,6 +65,9 @@ public abstract class AbstractJsonRowRecordReader implements 
RecordReader {
     private JsonNode firstJsonNode;
     private StartingFieldStrategy strategy;
 
+    private Map<String, String> capturedFields;
+    private BiPredicate<String, String> captureFieldPredicate;
+
     private AbstractJsonRowRecordReader(final ComponentLog logger, final 
String dateFormat, final String timeFormat, final String timestampFormat) {
         this.logger = logger;
 
@@ -76,27 +80,61 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
         LAZY_TIMESTAMP_FORMAT = () -> tsf;
     }
 
-    protected AbstractJsonRowRecordReader(final InputStream in, final 
ComponentLog logger, final String dateFormat, final String timeFormat, final 
String timestampFormat)
+    protected AbstractJsonRowRecordReader(final InputStream in,
+                                          final ComponentLog logger,
+                                          final String dateFormat,
+                                          final String timeFormat,
+                                          final String timestampFormat)
             throws IOException, MalformedRecordException {
 
-        this(in, logger, dateFormat, timeFormat, timestampFormat, null, null);
+        this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, 
null);
     }
 
-    protected AbstractJsonRowRecordReader(final InputStream in, final 
ComponentLog logger, final String dateFormat, final String timeFormat, final 
String timestampFormat,
-                                          final StartingFieldStrategy 
strategy, final String nestedFieldName) throws IOException, 
MalformedRecordException {
+    /**
+     * Constructor with initial logic for JSON to NiFi record parsing.
+     *
+     * @param in                     the input stream to parse
+     * @param logger                 ComponentLog
+     * @param dateFormat             format for parsing date fields
+     * @param timeFormat             format for parsing time fields
+     * @param timestampFormat        format for parsing timestamp fields
+     * @param strategy               whether to start processing from a 
specific field
+     * @param nestedFieldName        the name of the field to start the 
processing from
+     * @param captureFieldPredicate predicate that takes a JSON fieldName and 
fieldValue to capture top-level non-processed fields which can
+     *                               be accessed by calling {@link 
#getCapturedFields()}
+     * @throws IOException              in case of JSON stream processing 
failure
+     * @throws MalformedRecordException in case of malformed JSON input
+     */
+    protected AbstractJsonRowRecordReader(final InputStream in,
+                                          final ComponentLog logger,
+                                          final String dateFormat,
+                                          final String timeFormat,
+                                          final String timestampFormat,
+                                          final StartingFieldStrategy strategy,
+                                          final String nestedFieldName,
+                                          final BiPredicate<String, String> 
captureFieldPredicate)
+            throws IOException, MalformedRecordException {
 
         this(logger, dateFormat, timeFormat, timestampFormat);
 
         this.strategy = strategy;
+        this.captureFieldPredicate = captureFieldPredicate;
+        capturedFields = new HashMap<>();
 
         try {
             jsonParser = jsonFactory.createParser(in);
             jsonParser.setCodec(codec);
 
             if (strategy == StartingFieldStrategy.NESTED_FIELD) {
-                final SerializedString serializedStartingFieldName = new 
SerializedString(nestedFieldName);
-                while (!jsonParser.nextFieldName(serializedStartingFieldName) 
&& jsonParser.hasCurrentToken());
-                logger.debug("Parsing starting at nested field [{}]", 
nestedFieldName);
+                while (jsonParser.nextToken() != null) {
+                    if (nestedFieldName.equals(jsonParser.getCurrentName())) {
+                        logger.debug("Parsing starting at nested field [{}]", 
nestedFieldName);
+                        break;
+                    }
+                    if (captureFieldPredicate != null) {
+                        captureCurrentField(captureFieldPredicate);
+                    }
+                }
             }
 
             JsonToken token = jsonParser.nextToken();
@@ -130,6 +168,11 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
     public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknownFields) throws IOException, MalformedRecordException {
         final JsonNode nextNode = getNextJsonNode();
         if (nextNode == null) {
+            if (captureFieldPredicate != null) {
+                while (jsonParser.nextToken() != null) {
+                    captureCurrentField(captureFieldPredicate);
+                }
+            }
             return null;
         }
 
@@ -242,6 +285,19 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
         return null;
     }
 
+    private void captureCurrentField(BiPredicate<String, String> 
captureFieldPredicate) throws IOException {
+        if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME) {
+            jsonParser.nextToken();
+
+            final String fieldName = jsonParser.getCurrentName();
+            final String fieldValue = jsonParser.getValueAsString();
+
+            if (captureFieldPredicate.test(fieldName, fieldValue)) {
+                capturedFields.put(fieldName, fieldValue);
+            }
+        }
+    }
+
     private Map<String, Object> getMapFromRawValue(final JsonNode fieldNode, 
final DataType dataType, final String fieldName) throws IOException {
         if (dataType == null || dataType.getFieldType() != 
RecordFieldType.MAP) {
             return null;
@@ -389,4 +445,9 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
     }
 
     protected abstract Record convertJsonNodeToRecord(JsonNode nextNode, 
RecordSchema schema, boolean coerceTypes, boolean dropUnknownFields) throws 
IOException, MalformedRecordException;
+
+
+    public Map<String, String> getCapturedFields() {
+        return capturedFields;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index ae2ae11afb..fce50e35c9 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -37,13 +37,14 @@ import 
org.apache.nifi.serialization.record.util.DataTypeUtils;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Queue;
+import java.util.function.BiPredicate;
 import java.util.function.Supplier;
 
 public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
@@ -52,16 +53,16 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
 
     public JsonTreeRowRecordReader(final InputStream in, final ComponentLog 
logger, final RecordSchema schema,
                                    final String dateFormat, final String 
timeFormat, final String timestampFormat) throws IOException, 
MalformedRecordException {
-        this(in, logger, schema, dateFormat, timeFormat, timestampFormat, 
null, null, null);
+        this(in, logger, schema, dateFormat, timeFormat, timestampFormat, 
null, null, null, null);
     }
 
     public JsonTreeRowRecordReader(final InputStream in, final ComponentLog 
logger, final RecordSchema schema,
                                    final String dateFormat, final String 
timeFormat, final String timestampFormat,
                                    final StartingFieldStrategy 
startingFieldStrategy, final String startingFieldName,
-                                   final SchemaApplicationStrategy 
schemaApplicationStrategy)
+                                   final SchemaApplicationStrategy 
schemaApplicationStrategy, final BiPredicate<String, String> 
captureFieldPredicate)
             throws IOException, MalformedRecordException {
 
-        super(in, logger, dateFormat, timeFormat, timestampFormat, 
startingFieldStrategy, startingFieldName);
+        super(in, logger, dateFormat, timeFormat, timestampFormat, 
startingFieldStrategy, startingFieldName, captureFieldPredicate);
         if (startingFieldStrategy == StartingFieldStrategy.NESTED_FIELD && 
schemaApplicationStrategy == SchemaApplicationStrategy.WHOLE_JSON) {
             this.schema = getSelectedSchema(schema, startingFieldName);
         } else {
@@ -79,7 +80,7 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
                 return getChildSchemaFromField(optionalRecordField.get());
             } else {
                 for (RecordField field : currentSchema.getFields()) {
-                    if (field.getDataType() instanceof  ArrayDataType || 
field.getDataType() instanceof RecordDataType) {
+                    if (field.getDataType() instanceof ArrayDataType || 
field.getDataType() instanceof RecordDataType) {
                         schemas.add(getChildSchemaFromField(field));
                     }
                 }
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
index df3ff3c6b3..5d1ba58d11 100644
--- 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
@@ -75,6 +75,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiPredicate;
 
 @PrimaryNodeOnly
 @TriggerSerially
@@ -220,6 +222,8 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
     private static final String DATE_FORMAT = "yyyy-MM-dd";
     private static final String TIME_FORMAT = "HH:mm:ss.SSSX";
     private static final String DATE_TIME_FORMAT = 
"yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ";
+    private static final String NEXT_RECORDS_URL = "nextRecordsUrl";
+    private static final BiPredicate<String, String> CAPTURE_PREDICATE = 
(fieldName, fieldValue) -> NEXT_RECORDS_URL.equals(fieldName);
 
     private volatile SalesforceToRecordSchemaConverter 
salesForceToRecordSchemaConverter;
     private volatile SalesforceRestService salesforceRestService;
@@ -330,76 +334,93 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
                 ageFilterUpper
         );
 
-        FlowFile flowFile = session.create();
-
-        Map<String, String> originalAttributes = flowFile.getAttributes();
-        Map<String, String> attributes = new HashMap<>();
-
-        AtomicInteger recordCountHolder = new AtomicInteger();
-
-        flowFile = session.write(flowFile, out -> {
-            try (
-                    InputStream querySObjectResultInputStream = 
salesforceRestService.query(querySObject);
-                    JsonTreeRowRecordReader jsonReader = new 
JsonTreeRowRecordReader(
-                            querySObjectResultInputStream,
-                            getLogger(),
-                            convertedSalesforceSchema.recordSchema,
-                            DATE_FORMAT,
-                            TIME_FORMAT,
-                            DATE_TIME_FORMAT,
-                            StartingFieldStrategy.NESTED_FIELD,
-                            STARTING_FIELD_NAME,
-                            SchemaApplicationStrategy.SELECTED_PART
-                    );
-
-                    RecordSetWriter writer = writerFactory.createWriter(
-                            getLogger(),
-                            writerFactory.getSchema(
-                                    originalAttributes,
-                                    convertedSalesforceSchema.recordSchema
-                            ),
-                            out,
-                            originalAttributes
-                    )
-            ) {
-                writer.beginRecordSet();
-
-                Record querySObjectRecord;
-                while ((querySObjectRecord = jsonReader.nextRecord()) != null) 
{
-                    writer.write(querySObjectRecord);
+        AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
+
+        do {
+
+            FlowFile flowFile = session.create();
+            Map<String, String> originalAttributes = flowFile.getAttributes();
+            Map<String, String> attributes = new HashMap<>();
+
+            AtomicInteger recordCountHolder = new AtomicInteger();
+
+            flowFile = session.write(flowFile, out -> {
+                try (
+                        InputStream querySObjectResultInputStream = 
getResultInputStream(nextRecordsUrl, querySObject);
+
+                        JsonTreeRowRecordReader jsonReader = new 
JsonTreeRowRecordReader(
+                                querySObjectResultInputStream,
+                                getLogger(),
+                                convertedSalesforceSchema.recordSchema,
+                                DATE_FORMAT,
+                                TIME_FORMAT,
+                                DATE_TIME_FORMAT,
+                                StartingFieldStrategy.NESTED_FIELD,
+                                STARTING_FIELD_NAME,
+                                SchemaApplicationStrategy.SELECTED_PART,
+                                CAPTURE_PREDICATE
+                        );
+
+                        RecordSetWriter writer = writerFactory.createWriter(
+                                getLogger(),
+                                writerFactory.getSchema(
+                                        originalAttributes,
+                                        convertedSalesforceSchema.recordSchema
+                                ),
+                                out,
+                                originalAttributes
+                        )
+                ) {
+                    writer.beginRecordSet();
+
+                    Record querySObjectRecord;
+                    while ((querySObjectRecord = jsonReader.nextRecord()) != 
null) {
+                        writer.write(querySObjectRecord);
+                    }
+
+                    WriteResult writeResult = writer.finishRecordSet();
+
+                    Map<String, String> capturedFields = 
jsonReader.getCapturedFields();
+
+                    
nextRecordsUrl.set(capturedFields.getOrDefault(NEXT_RECORDS_URL, null));
+
+                    attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
+                    attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
+                    attributes.putAll(writeResult.getAttributes());
+
+                    recordCountHolder.set(writeResult.getRecordCount());
+
+                    if (ageFilterUpper != null) {
+                        Map<String, String> newState = new 
HashMap<>(state.toMap());
+                        newState.put(LAST_AGE_FILTER, ageFilterUpper);
+                        updateState(context, newState);
+                    }
+                } catch (SchemaNotFoundException e) {
+                    throw new ProcessException("Couldn't create record 
writer", e);
+                } catch (MalformedRecordException e) {
+                    throw new ProcessException("Couldn't read records from 
input", e);
                 }
+            });
 
-                WriteResult writeResult = writer.finishRecordSet();
+            int recordCount = recordCountHolder.get();
 
-                attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
-                attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
-                attributes.putAll(writeResult.getAttributes());
-
-                recordCountHolder.set(writeResult.getRecordCount());
+            if (!createZeroRecordFlowFiles && recordCount == 0) {
+                session.remove(flowFile);
+            } else {
+                flowFile = session.putAllAttributes(flowFile, attributes);
+                session.transfer(flowFile, REL_SUCCESS);
 
-                if (ageFilterUpper != null) {
-                    Map<String, String> newState = new 
HashMap<>(state.toMap());
-                    newState.put(LAST_AGE_FILTER, ageFilterUpper);
-                    updateState(context, newState);
-                }
-            } catch (SchemaNotFoundException e) {
-                throw new ProcessException("Couldn't create record writer", e);
-            } catch (MalformedRecordException e) {
-                throw new ProcessException("Couldn't read records from input", 
e);
+                session.adjustCounter("Records Processed", recordCount, false);
+                getLogger().info("Successfully written {} records for {}", 
recordCount, flowFile);
             }
-        });
-
-        int recordCount = recordCountHolder.get();
-
-        if (!createZeroRecordFlowFiles && recordCount == 0) {
-            session.remove(flowFile);
-        } else {
-            flowFile = session.putAllAttributes(flowFile, attributes);
-            session.transfer(flowFile, REL_SUCCESS);
+        } while (nextRecordsUrl.get() != null);
+    }
 
-            session.adjustCounter("Records Processed", recordCount, false);
-            getLogger().info("Successfully written {} records for {}", 
recordCount, flowFile);
+    private InputStream getResultInputStream(AtomicReference<String> 
nextRecordsUrl, String querySObject) {
+        if (nextRecordsUrl.get() == null) {
+            return salesforceRestService.query(querySObject);
         }
+        return salesforceRestService.getNextRecords(nextRecordsUrl.get());
     }
 
     private ConvertedSalesforceSchema getConvertedSalesforceSchema(String 
sObject, String fields) {
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
index bc3f746158..8d6a8fddae 100644
--- 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
@@ -69,6 +69,21 @@ public class SalesforceRestService {
         return request(request);
     }
 
+    public InputStream getNextRecords(String nextRecordsUrl) {
+        String url = baseUrl + nextRecordsUrl;
+
+        HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
+                .build();
+
+        Request request = new Request.Builder()
+                .addHeader("Authorization", "Bearer " + 
accessTokenProvider.get())
+                .url(httpUrl)
+                .get()
+                .build();
+
+        return request(request);
+    }
+
     private InputStream request(Request request) {
         Response response = null;
         try {
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
index 5de7d74c7d..1d4caa8dbc 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
@@ -76,7 +76,6 @@ public class JsonTreeReader extends SchemaRegistryService 
implements RecordReade
     private volatile StartingFieldStrategy startingFieldStrategy;
     private volatile SchemaApplicationStrategy schemaApplicationStrategy;
 
-
     public static final PropertyDescriptor STARTING_FIELD_STRATEGY = new 
PropertyDescriptor.Builder()
             .name("starting-field-strategy")
             .displayName("Starting Field Strategy")
@@ -165,6 +164,7 @@ public class JsonTreeReader extends SchemaRegistryService 
implements RecordReade
     public RecordReader createRecordReader(final Map<String, String> 
variables, final InputStream in, final long inputLength, final ComponentLog 
logger)
             throws IOException, MalformedRecordException, 
SchemaNotFoundException {
         final RecordSchema schema = getSchema(variables, in, null);
-        return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, 
timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, 
schemaApplicationStrategy);
+        return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, 
timeFormat, timestampFormat, startingFieldStrategy, startingFieldName,
+                schemaApplicationStrategy, null);
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index e885c95df7..e8b0b6c48c 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -51,7 +51,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -94,14 +96,6 @@ class TestJsonTreeRowRecordReader {
         return new SimpleRecordSchema(accountFields);
     }
 
-    private RecordSchema getSchema() {
-        final DataType accountType = 
RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
-        final List<RecordField> fields = getDefaultFields();
-        fields.add(new RecordField("account", accountType));
-        fields.remove(new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType()));
-        return new SimpleRecordSchema(fields);
-    }
-
     @Test
     void testReadChoiceOfStringOrArrayOfRecords() throws IOException, 
MalformedRecordException {
         final File schemaFile = new 
File("src/test/resources/json/choice-of-string-or-array-record.avsc");
@@ -1253,6 +1247,54 @@ class TestJsonTreeRowRecordReader {
                 "nestedLevel2Record", SchemaApplicationStrategy.WHOLE_JSON);
     }
 
+    @Test
+    void testCaptureFields() throws IOException, MalformedRecordException {
+        Map<String, String> expectedCapturedFields = new HashMap<>();
+        expectedCapturedFields.put("id", "1");
+        expectedCapturedFields.put("zipCode", "11111");
+        expectedCapturedFields.put("country", "USA");
+        expectedCapturedFields.put("job", null);
+        Set<String> fieldsToCapture = expectedCapturedFields.keySet();
+        BiPredicate<String, String> capturePredicate = (fieldName, fieldValue) 
-> fieldsToCapture.contains(fieldName);
+        String startingFieldName = "accounts";
+
+
+        SimpleRecordSchema accountRecordSchema = new 
SimpleRecordSchema(Arrays.asList(
+                new RecordField("id", RecordFieldType.INT.getDataType()),
+                new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType())
+        ));
+
+        SimpleRecordSchema jobRecordSchema = new 
SimpleRecordSchema(Arrays.asList(
+                new RecordField("salary", RecordFieldType.INT.getDataType()),
+                new RecordField("position", 
RecordFieldType.STRING.getDataType())
+        ));
+
+        SimpleRecordSchema recordSchema = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("id", RecordFieldType.INT.getDataType()),
+                new RecordField("accounts", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(accountRecordSchema))),
+                new RecordField("name", RecordFieldType.STRING.getDataType()),
+                new RecordField("address", 
RecordFieldType.STRING.getDataType()),
+                new RecordField("city", RecordFieldType.STRING.getDataType()),
+                new RecordField("job", 
RecordFieldType.RECORD.getRecordDataType(jobRecordSchema)),
+                new RecordField("state", RecordFieldType.STRING.getDataType()),
+                new RecordField("zipCode", 
RecordFieldType.STRING.getDataType()),
+                new RecordField("country", 
RecordFieldType.STRING.getDataType())
+        ));
+
+        try (InputStream in = new 
FileInputStream("src/test/resources/json/capture-fields.json")) {
+            JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(
+                    in, mock(ComponentLog.class), recordSchema,
+                    dateFormat, timeFormat, timestampFormat,
+                    StartingFieldStrategy.NESTED_FIELD, startingFieldName,
+                    SchemaApplicationStrategy.SELECTED_PART, capturePredicate);
+
+            while (reader.nextRecord() != null);
+            Map<String, String> capturedFields = reader.getCapturedFields();
+
+            assertEquals(expectedCapturedFields, capturedFields);
+        }
+    }
+
     private void testReadRecords(String jsonPath, List<Object> expected) 
throws IOException, MalformedRecordException {
         final File jsonFile = new File(jsonPath);
         try (
@@ -1263,8 +1305,12 @@ class TestJsonTreeRowRecordReader {
         }
     }
 
-    private void testReadRecords(String jsonPath, List<Object> expected, 
StartingFieldStrategy strategy,
-                                 String startingFieldName) throws IOException, 
MalformedRecordException {
+    private void testReadRecords(String jsonPath,
+                                 List<Object> expected,
+                                 StartingFieldStrategy strategy,
+                                 String startingFieldName)
+            throws IOException, MalformedRecordException {
+
         final File jsonFile = new File(jsonPath);
         try (InputStream jsonStream = new 
ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
             RecordSchema schema = inferSchema(jsonStream, strategy, 
startingFieldName);
@@ -1279,8 +1325,14 @@ class TestJsonTreeRowRecordReader {
         }
     }
 
-    private void testReadRecords(String jsonPath, RecordSchema schema, 
List<Object> expected, StartingFieldStrategy strategy,
-                                 String startingFieldName, 
SchemaApplicationStrategy schemaApplicationStrategy) throws IOException, 
MalformedRecordException {
+    private void testReadRecords(String jsonPath,
+                                 RecordSchema schema,
+                                 List<Object> expected,
+                                 StartingFieldStrategy strategy,
+                                 String startingFieldName,
+                                 SchemaApplicationStrategy 
schemaApplicationStrategy
+    ) throws IOException, MalformedRecordException {
+
         final File jsonFile = new File(jsonPath);
         try (InputStream jsonStream = new 
ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
             testReadRecords(jsonStream, schema, expected, strategy, 
startingFieldName, schemaApplicationStrategy);
@@ -1314,11 +1366,16 @@ class TestJsonTreeRowRecordReader {
         }
     }
 
-    private void testReadRecords(InputStream jsonStream, RecordSchema schema, 
List<Object> expected, StartingFieldStrategy strategy,
-                                 String startingFieldName, 
SchemaApplicationStrategy schemaApplicationStrategy)
+    private void testReadRecords(InputStream jsonStream,
+                                 RecordSchema schema,
+                                 List<Object> expected,
+                                 StartingFieldStrategy strategy,
+                                 String startingFieldName,
+                                 SchemaApplicationStrategy 
schemaApplicationStrategy)
             throws IOException, MalformedRecordException {
+
         try (JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, 
dateFormat, timeFormat, timestampFormat,
-                strategy, startingFieldName, schemaApplicationStrategy)) {
+                strategy, startingFieldName, schemaApplicationStrategy, null)) 
{
             List<Object> actual = new ArrayList<>();
             Record record;
 
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/capture-fields.json
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/capture-fields.json
new file mode 100644
index 0000000000..2011528f23
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/capture-fields.json
@@ -0,0 +1,20 @@
+{
+       "id": 1,
+       "accounts": [{
+               "id": 42,
+               "balance": 4750.89
+       }, {
+               "id": 43,
+               "balance": 48212.38
+       }],
+       "name": "John Doe",
+       "address": "123 My Street",
+       "city": "My City",
+       "job" : {
+               "salary": 115431,
+               "position": "acountant"
+       },
+       "state": "MS",
+       "zipCode": "11111",
+       "country": "USA"
+}
\ No newline at end of file

Reply via email to