This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 63aac1a31d NIFI-10513: Added capture non-record fields to
JsonTreeRowRecordReader, added pagination to QuerySalesforceObject
63aac1a31d is described below
commit 63aac1a31d5d35fb133d5768abf99201964a16b4
Author: Lehel Boér <[email protected]>
AuthorDate: Fri Sep 23 05:10:12 2022 +0200
NIFI-10513: Added capture non-record fields to JsonTreeRowRecordReader,
added pagination to QuerySalesforceObject
This closes #6444.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../nifi/json/AbstractJsonRowRecordReader.java | 77 +++++++++--
.../apache/nifi/json/JsonTreeRowRecordReader.java | 11 +-
.../salesforce/QuerySalesforceObject.java | 147 ++++++++++++---------
.../salesforce/util/SalesforceRestService.java | 15 +++
.../java/org/apache/nifi/json/JsonTreeReader.java | 4 +-
.../nifi/json/TestJsonTreeRowRecordReader.java | 87 +++++++++---
.../src/test/resources/json/capture-fields.json | 20 +++
7 files changed, 268 insertions(+), 93 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
index a3515d03db..b024beed6c 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.core.io.SerializedString;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -48,9 +47,11 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.function.BiPredicate;
import java.util.function.Supplier;
public abstract class AbstractJsonRowRecordReader implements RecordReader {
+
private final ComponentLog logger;
private final Supplier<DateFormat> LAZY_DATE_FORMAT;
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
@@ -64,6 +65,9 @@ public abstract class AbstractJsonRowRecordReader implements
RecordReader {
private JsonNode firstJsonNode;
private StartingFieldStrategy strategy;
+ private Map<String, String> capturedFields;
+ private BiPredicate<String, String> captureFieldPredicate;
+
private AbstractJsonRowRecordReader(final ComponentLog logger, final
String dateFormat, final String timeFormat, final String timestampFormat) {
this.logger = logger;
@@ -76,27 +80,61 @@ public abstract class AbstractJsonRowRecordReader
implements RecordReader {
LAZY_TIMESTAMP_FORMAT = () -> tsf;
}
- protected AbstractJsonRowRecordReader(final InputStream in, final
ComponentLog logger, final String dateFormat, final String timeFormat, final
String timestampFormat)
+ protected AbstractJsonRowRecordReader(final InputStream in,
+ final ComponentLog logger,
+ final String dateFormat,
+ final String timeFormat,
+ final String timestampFormat)
throws IOException, MalformedRecordException {
- this(in, logger, dateFormat, timeFormat, timestampFormat, null, null);
+ this(in, logger, dateFormat, timeFormat, timestampFormat, null, null,
null);
}
- protected AbstractJsonRowRecordReader(final InputStream in, final
ComponentLog logger, final String dateFormat, final String timeFormat, final
String timestampFormat,
- final StartingFieldStrategy
strategy, final String nestedFieldName) throws IOException,
MalformedRecordException {
+ /**
+ * Constructor with initial logic for JSON to NiFi record parsing.
+ *
+ * @param in the input stream to parse
+ * @param logger ComponentLog
+ * @param dateFormat format for parsing date fields
+ * @param timeFormat format for parsing time fields
+ * @param timestampFormat format for parsing timestamp fields
+ * @param strategy whether to start processing from a
specific field
+ * @param nestedFieldName the name of the field to start the
processing from
+ * @param captureFieldPredicate predicate that takes a JSON fieldName and
fieldValue to capture top-level non-processed fields which can
+ * be accessed by calling {@link
#getCapturedFields()}
+ * @throws IOException in case of JSON stream processing
failure
+ * @throws MalformedRecordException in case of malformed JSON input
+ */
+ protected AbstractJsonRowRecordReader(final InputStream in,
+ final ComponentLog logger,
+ final String dateFormat,
+ final String timeFormat,
+ final String timestampFormat,
+ final StartingFieldStrategy strategy,
+ final String nestedFieldName,
+ final BiPredicate<String, String>
captureFieldPredicate)
+ throws IOException, MalformedRecordException {
this(logger, dateFormat, timeFormat, timestampFormat);
this.strategy = strategy;
+ this.captureFieldPredicate = captureFieldPredicate;
+ capturedFields = new HashMap<>();
try {
jsonParser = jsonFactory.createParser(in);
jsonParser.setCodec(codec);
if (strategy == StartingFieldStrategy.NESTED_FIELD) {
- final SerializedString serializedStartingFieldName = new
SerializedString(nestedFieldName);
- while (!jsonParser.nextFieldName(serializedStartingFieldName)
&& jsonParser.hasCurrentToken());
- logger.debug("Parsing starting at nested field [{}]",
nestedFieldName);
+ while (jsonParser.nextToken() != null) {
+ if (nestedFieldName.equals(jsonParser.getCurrentName())) {
+ logger.debug("Parsing starting at nested field [{}]",
nestedFieldName);
+ break;
+ }
+ if (captureFieldPredicate != null) {
+ captureCurrentField(captureFieldPredicate);
+ }
+ }
}
JsonToken token = jsonParser.nextToken();
@@ -130,6 +168,11 @@ public abstract class AbstractJsonRowRecordReader
implements RecordReader {
public Record nextRecord(final boolean coerceTypes, final boolean
dropUnknownFields) throws IOException, MalformedRecordException {
final JsonNode nextNode = getNextJsonNode();
if (nextNode == null) {
+ if (captureFieldPredicate != null) {
+ while (jsonParser.nextToken() != null) {
+ captureCurrentField(captureFieldPredicate);
+ }
+ }
return null;
}
@@ -242,6 +285,19 @@ public abstract class AbstractJsonRowRecordReader
implements RecordReader {
return null;
}
+ private void captureCurrentField(BiPredicate<String, String>
captureFieldPredicate) throws IOException {
+ if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME) {
+ jsonParser.nextToken();
+
+ final String fieldName = jsonParser.getCurrentName();
+ final String fieldValue = jsonParser.getValueAsString();
+
+ if (captureFieldPredicate.test(fieldName, fieldValue)) {
+ capturedFields.put(fieldName, fieldValue);
+ }
+ }
+ }
+
private Map<String, Object> getMapFromRawValue(final JsonNode fieldNode,
final DataType dataType, final String fieldName) throws IOException {
if (dataType == null || dataType.getFieldType() !=
RecordFieldType.MAP) {
return null;
@@ -389,4 +445,9 @@ public abstract class AbstractJsonRowRecordReader
implements RecordReader {
}
protected abstract Record convertJsonNodeToRecord(JsonNode nextNode,
RecordSchema schema, boolean coerceTypes, boolean dropUnknownFields) throws
IOException, MalformedRecordException;
+
+
+ public Map<String, String> getCapturedFields() {
+ return capturedFields;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index ae2ae11afb..fce50e35c9 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -37,13 +37,14 @@ import
org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
-import java.util.LinkedHashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
+import java.util.function.BiPredicate;
import java.util.function.Supplier;
public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
@@ -52,16 +53,16 @@ public class JsonTreeRowRecordReader extends
AbstractJsonRowRecordReader {
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog
logger, final RecordSchema schema,
final String dateFormat, final String
timeFormat, final String timestampFormat) throws IOException,
MalformedRecordException {
- this(in, logger, schema, dateFormat, timeFormat, timestampFormat,
null, null, null);
+ this(in, logger, schema, dateFormat, timeFormat, timestampFormat,
null, null, null, null);
}
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog
logger, final RecordSchema schema,
final String dateFormat, final String
timeFormat, final String timestampFormat,
final StartingFieldStrategy
startingFieldStrategy, final String startingFieldName,
- final SchemaApplicationStrategy
schemaApplicationStrategy)
+ final SchemaApplicationStrategy
schemaApplicationStrategy, final BiPredicate<String, String>
captureFieldPredicate)
throws IOException, MalformedRecordException {
- super(in, logger, dateFormat, timeFormat, timestampFormat,
startingFieldStrategy, startingFieldName);
+ super(in, logger, dateFormat, timeFormat, timestampFormat,
startingFieldStrategy, startingFieldName, captureFieldPredicate);
if (startingFieldStrategy == StartingFieldStrategy.NESTED_FIELD &&
schemaApplicationStrategy == SchemaApplicationStrategy.WHOLE_JSON) {
this.schema = getSelectedSchema(schema, startingFieldName);
} else {
@@ -79,7 +80,7 @@ public class JsonTreeRowRecordReader extends
AbstractJsonRowRecordReader {
return getChildSchemaFromField(optionalRecordField.get());
} else {
for (RecordField field : currentSchema.getFields()) {
- if (field.getDataType() instanceof ArrayDataType ||
field.getDataType() instanceof RecordDataType) {
+ if (field.getDataType() instanceof ArrayDataType ||
field.getDataType() instanceof RecordDataType) {
schemas.add(getChildSchemaFromField(field));
}
}
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
index df3ff3c6b3..5d1ba58d11 100644
---
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
@@ -75,6 +75,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiPredicate;
@PrimaryNodeOnly
@TriggerSerially
@@ -220,6 +222,8 @@ public class QuerySalesforceObject extends
AbstractProcessor {
private static final String DATE_FORMAT = "yyyy-MM-dd";
private static final String TIME_FORMAT = "HH:mm:ss.SSSX";
private static final String DATE_TIME_FORMAT =
"yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ";
+ private static final String NEXT_RECORDS_URL = "nextRecordsUrl";
+ private static final BiPredicate<String, String> CAPTURE_PREDICATE =
(fieldName, fieldValue) -> NEXT_RECORDS_URL.equals(fieldName);
private volatile SalesforceToRecordSchemaConverter
salesForceToRecordSchemaConverter;
private volatile SalesforceRestService salesforceRestService;
@@ -330,76 +334,93 @@ public class QuerySalesforceObject extends
AbstractProcessor {
ageFilterUpper
);
- FlowFile flowFile = session.create();
-
- Map<String, String> originalAttributes = flowFile.getAttributes();
- Map<String, String> attributes = new HashMap<>();
-
- AtomicInteger recordCountHolder = new AtomicInteger();
-
- flowFile = session.write(flowFile, out -> {
- try (
- InputStream querySObjectResultInputStream =
salesforceRestService.query(querySObject);
- JsonTreeRowRecordReader jsonReader = new
JsonTreeRowRecordReader(
- querySObjectResultInputStream,
- getLogger(),
- convertedSalesforceSchema.recordSchema,
- DATE_FORMAT,
- TIME_FORMAT,
- DATE_TIME_FORMAT,
- StartingFieldStrategy.NESTED_FIELD,
- STARTING_FIELD_NAME,
- SchemaApplicationStrategy.SELECTED_PART
- );
-
- RecordSetWriter writer = writerFactory.createWriter(
- getLogger(),
- writerFactory.getSchema(
- originalAttributes,
- convertedSalesforceSchema.recordSchema
- ),
- out,
- originalAttributes
- )
- ) {
- writer.beginRecordSet();
-
- Record querySObjectRecord;
- while ((querySObjectRecord = jsonReader.nextRecord()) != null)
{
- writer.write(querySObjectRecord);
+ AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
+
+ do {
+
+ FlowFile flowFile = session.create();
+ Map<String, String> originalAttributes = flowFile.getAttributes();
+ Map<String, String> attributes = new HashMap<>();
+
+ AtomicInteger recordCountHolder = new AtomicInteger();
+
+ flowFile = session.write(flowFile, out -> {
+ try (
+ InputStream querySObjectResultInputStream =
getResultInputStream(nextRecordsUrl, querySObject);
+
+ JsonTreeRowRecordReader jsonReader = new
JsonTreeRowRecordReader(
+ querySObjectResultInputStream,
+ getLogger(),
+ convertedSalesforceSchema.recordSchema,
+ DATE_FORMAT,
+ TIME_FORMAT,
+ DATE_TIME_FORMAT,
+ StartingFieldStrategy.NESTED_FIELD,
+ STARTING_FIELD_NAME,
+ SchemaApplicationStrategy.SELECTED_PART,
+ CAPTURE_PREDICATE
+ );
+
+ RecordSetWriter writer = writerFactory.createWriter(
+ getLogger(),
+ writerFactory.getSchema(
+ originalAttributes,
+ convertedSalesforceSchema.recordSchema
+ ),
+ out,
+ originalAttributes
+ )
+ ) {
+ writer.beginRecordSet();
+
+ Record querySObjectRecord;
+ while ((querySObjectRecord = jsonReader.nextRecord()) !=
null) {
+ writer.write(querySObjectRecord);
+ }
+
+ WriteResult writeResult = writer.finishRecordSet();
+
+ Map<String, String> capturedFields =
jsonReader.getCapturedFields();
+
+
nextRecordsUrl.set(capturedFields.getOrDefault(NEXT_RECORDS_URL, null));
+
+ attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
+
+ recordCountHolder.set(writeResult.getRecordCount());
+
+ if (ageFilterUpper != null) {
+ Map<String, String> newState = new
HashMap<>(state.toMap());
+ newState.put(LAST_AGE_FILTER, ageFilterUpper);
+ updateState(context, newState);
+ }
+ } catch (SchemaNotFoundException e) {
+ throw new ProcessException("Couldn't create record
writer", e);
+ } catch (MalformedRecordException e) {
+ throw new ProcessException("Couldn't read records from
input", e);
}
+ });
- WriteResult writeResult = writer.finishRecordSet();
+ int recordCount = recordCountHolder.get();
- attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
- attributes.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
- attributes.putAll(writeResult.getAttributes());
-
- recordCountHolder.set(writeResult.getRecordCount());
+ if (!createZeroRecordFlowFiles && recordCount == 0) {
+ session.remove(flowFile);
+ } else {
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ session.transfer(flowFile, REL_SUCCESS);
- if (ageFilterUpper != null) {
- Map<String, String> newState = new
HashMap<>(state.toMap());
- newState.put(LAST_AGE_FILTER, ageFilterUpper);
- updateState(context, newState);
- }
- } catch (SchemaNotFoundException e) {
- throw new ProcessException("Couldn't create record writer", e);
- } catch (MalformedRecordException e) {
- throw new ProcessException("Couldn't read records from input",
e);
+ session.adjustCounter("Records Processed", recordCount, false);
+ getLogger().info("Successfully written {} records for {}",
recordCount, flowFile);
}
- });
-
- int recordCount = recordCountHolder.get();
-
- if (!createZeroRecordFlowFiles && recordCount == 0) {
- session.remove(flowFile);
- } else {
- flowFile = session.putAllAttributes(flowFile, attributes);
- session.transfer(flowFile, REL_SUCCESS);
+ } while (nextRecordsUrl.get() != null);
+ }
- session.adjustCounter("Records Processed", recordCount, false);
- getLogger().info("Successfully written {} records for {}",
recordCount, flowFile);
+ private InputStream getResultInputStream(AtomicReference<String>
nextRecordsUrl, String querySObject) {
+ if (nextRecordsUrl.get() == null) {
+ return salesforceRestService.query(querySObject);
}
+ return salesforceRestService.getNextRecords(nextRecordsUrl.get());
}
private ConvertedSalesforceSchema getConvertedSalesforceSchema(String
sObject, String fields) {
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
index bc3f746158..8d6a8fddae 100644
---
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
@@ -69,6 +69,21 @@ public class SalesforceRestService {
return request(request);
}
+ public InputStream getNextRecords(String nextRecordsUrl) {
+ String url = baseUrl + nextRecordsUrl;
+
+ HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
+ .build();
+
+ Request request = new Request.Builder()
+ .addHeader("Authorization", "Bearer " +
accessTokenProvider.get())
+ .url(httpUrl)
+ .get()
+ .build();
+
+ return request(request);
+ }
+
private InputStream request(Request request) {
Response response = null;
try {
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
index 5de7d74c7d..1d4caa8dbc 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
@@ -76,7 +76,6 @@ public class JsonTreeReader extends SchemaRegistryService
implements RecordReade
private volatile StartingFieldStrategy startingFieldStrategy;
private volatile SchemaApplicationStrategy schemaApplicationStrategy;
-
public static final PropertyDescriptor STARTING_FIELD_STRATEGY = new
PropertyDescriptor.Builder()
.name("starting-field-strategy")
.displayName("Starting Field Strategy")
@@ -165,6 +164,7 @@ public class JsonTreeReader extends SchemaRegistryService
implements RecordReade
public RecordReader createRecordReader(final Map<String, String>
variables, final InputStream in, final long inputLength, final ComponentLog
logger)
throws IOException, MalformedRecordException,
SchemaNotFoundException {
final RecordSchema schema = getSchema(variables, in, null);
- return new JsonTreeRowRecordReader(in, logger, schema, dateFormat,
timeFormat, timestampFormat, startingFieldStrategy, startingFieldName,
schemaApplicationStrategy);
+ return new JsonTreeRowRecordReader(in, logger, schema, dateFormat,
timeFormat, timestampFormat, startingFieldStrategy, startingFieldName,
+ schemaApplicationStrategy, null);
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index e885c95df7..e8b0b6c48c 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -51,7 +51,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -94,14 +96,6 @@ class TestJsonTreeRowRecordReader {
return new SimpleRecordSchema(accountFields);
}
- private RecordSchema getSchema() {
- final DataType accountType =
RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
- final List<RecordField> fields = getDefaultFields();
- fields.add(new RecordField("account", accountType));
- fields.remove(new RecordField("balance",
RecordFieldType.DOUBLE.getDataType()));
- return new SimpleRecordSchema(fields);
- }
-
@Test
void testReadChoiceOfStringOrArrayOfRecords() throws IOException,
MalformedRecordException {
final File schemaFile = new
File("src/test/resources/json/choice-of-string-or-array-record.avsc");
@@ -1253,6 +1247,54 @@ class TestJsonTreeRowRecordReader {
"nestedLevel2Record", SchemaApplicationStrategy.WHOLE_JSON);
}
+ @Test
+ void testCaptureFields() throws IOException, MalformedRecordException {
+ Map<String, String> expectedCapturedFields = new HashMap<>();
+ expectedCapturedFields.put("id", "1");
+ expectedCapturedFields.put("zipCode", "11111");
+ expectedCapturedFields.put("country", "USA");
+ expectedCapturedFields.put("job", null);
+ Set<String> fieldsToCapture = expectedCapturedFields.keySet();
+ BiPredicate<String, String> capturePredicate = (fieldName, fieldValue)
-> fieldsToCapture.contains(fieldName);
+ String startingFieldName = "accounts";
+
+
+ SimpleRecordSchema accountRecordSchema = new
SimpleRecordSchema(Arrays.asList(
+ new RecordField("id", RecordFieldType.INT.getDataType()),
+ new RecordField("balance",
RecordFieldType.DOUBLE.getDataType())
+ ));
+
+ SimpleRecordSchema jobRecordSchema = new
SimpleRecordSchema(Arrays.asList(
+ new RecordField("salary", RecordFieldType.INT.getDataType()),
+ new RecordField("position",
RecordFieldType.STRING.getDataType())
+ ));
+
+ SimpleRecordSchema recordSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("id", RecordFieldType.INT.getDataType()),
+ new RecordField("accounts",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(accountRecordSchema))),
+ new RecordField("name", RecordFieldType.STRING.getDataType()),
+ new RecordField("address",
RecordFieldType.STRING.getDataType()),
+ new RecordField("city", RecordFieldType.STRING.getDataType()),
+ new RecordField("job",
RecordFieldType.RECORD.getRecordDataType(jobRecordSchema)),
+ new RecordField("state", RecordFieldType.STRING.getDataType()),
+ new RecordField("zipCode",
RecordFieldType.STRING.getDataType()),
+ new RecordField("country",
RecordFieldType.STRING.getDataType())
+ ));
+
+ try (InputStream in = new
FileInputStream("src/test/resources/json/capture-fields.json")) {
+ JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(
+ in, mock(ComponentLog.class), recordSchema,
+ dateFormat, timeFormat, timestampFormat,
+ StartingFieldStrategy.NESTED_FIELD, startingFieldName,
+ SchemaApplicationStrategy.SELECTED_PART, capturePredicate);
+
+ while (reader.nextRecord() != null);
+ Map<String, String> capturedFields = reader.getCapturedFields();
+
+ assertEquals(expectedCapturedFields, capturedFields);
+ }
+ }
+
private void testReadRecords(String jsonPath, List<Object> expected)
throws IOException, MalformedRecordException {
final File jsonFile = new File(jsonPath);
try (
@@ -1263,8 +1305,12 @@ class TestJsonTreeRowRecordReader {
}
}
- private void testReadRecords(String jsonPath, List<Object> expected,
StartingFieldStrategy strategy,
- String startingFieldName) throws IOException,
MalformedRecordException {
+ private void testReadRecords(String jsonPath,
+ List<Object> expected,
+ StartingFieldStrategy strategy,
+ String startingFieldName)
+ throws IOException, MalformedRecordException {
+
final File jsonFile = new File(jsonPath);
try (InputStream jsonStream = new
ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
RecordSchema schema = inferSchema(jsonStream, strategy,
startingFieldName);
@@ -1279,8 +1325,14 @@ class TestJsonTreeRowRecordReader {
}
}
- private void testReadRecords(String jsonPath, RecordSchema schema,
List<Object> expected, StartingFieldStrategy strategy,
- String startingFieldName,
SchemaApplicationStrategy schemaApplicationStrategy) throws IOException,
MalformedRecordException {
+ private void testReadRecords(String jsonPath,
+ RecordSchema schema,
+ List<Object> expected,
+ StartingFieldStrategy strategy,
+ String startingFieldName,
+ SchemaApplicationStrategy
schemaApplicationStrategy
+ ) throws IOException, MalformedRecordException {
+
final File jsonFile = new File(jsonPath);
try (InputStream jsonStream = new
ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
testReadRecords(jsonStream, schema, expected, strategy,
startingFieldName, schemaApplicationStrategy);
@@ -1314,11 +1366,16 @@ class TestJsonTreeRowRecordReader {
}
}
- private void testReadRecords(InputStream jsonStream, RecordSchema schema,
List<Object> expected, StartingFieldStrategy strategy,
- String startingFieldName,
SchemaApplicationStrategy schemaApplicationStrategy)
+ private void testReadRecords(InputStream jsonStream,
+ RecordSchema schema,
+ List<Object> expected,
+ StartingFieldStrategy strategy,
+ String startingFieldName,
+ SchemaApplicationStrategy
schemaApplicationStrategy)
throws IOException, MalformedRecordException {
+
try (JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema,
dateFormat, timeFormat, timestampFormat,
- strategy, startingFieldName, schemaApplicationStrategy)) {
+ strategy, startingFieldName, schemaApplicationStrategy, null))
{
List<Object> actual = new ArrayList<>();
Record record;
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/capture-fields.json
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/capture-fields.json
new file mode 100644
index 0000000000..2011528f23
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/capture-fields.json
@@ -0,0 +1,20 @@
+{
+ "id": 1,
+ "accounts": [{
+ "id": 42,
+ "balance": 4750.89
+ }, {
+ "id": 43,
+ "balance": 48212.38
+ }],
+ "name": "John Doe",
+ "address": "123 My Street",
+ "city": "My City",
+ "job" : {
+ "salary": 115431,
+ "position": "acountant"
+ },
+ "state": "MS",
+ "zipCode": "11111",
+ "country": "USA"
+}
\ No newline at end of file