This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit e0c1f0a89b9595b7a4d40b51737e5f9da876c07f
Author: Lehel Boér <[email protected]>
AuthorDate: Wed Feb 15 18:37:15 2023 +0100

    NIFI-11147: Query all fields in QuerySalesforceObject
    
    Fix review comments
---
 .../salesforce/QuerySalesforceObject.java          | 40 +++++++++++++++------
 .../util/SalesforceToRecordSchemaConverter.java    | 28 ++++++++-------
 .../SalesforceToRecordSchemaConverterTest.java     | 42 +++++++++++++++-------
 .../salesforce/util/TestRecordExtender.java        |  6 ++--
 4 files changed, 78 insertions(+), 38 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
index b5125a9eb1..d831f56948 100644
--- 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.processors.salesforce;
 
+import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
+import org.apache.camel.component.salesforce.api.dto.SObjectField;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
@@ -60,6 +62,7 @@ import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.StringUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -80,6 +83,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
 
 import static 
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_URL;
 import static 
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_VERSION;
@@ -120,8 +124,8 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
     static final PropertyDescriptor FIELD_NAMES = new 
PropertyDescriptor.Builder()
             .name("field-names")
             .displayName("Field Names")
-            .description("Comma-separated list of field names requested from 
the sObject to be queried")
-            .required(true)
+            .description("Comma-separated list of field names requested from 
the sObject to be queried. When this field is left empty, all fields are 
queried.")
+            .required(false)
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .build();
@@ -295,7 +299,14 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
             ageFilterUpper = 
ageFilterUpperTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
         }
 
-        ConvertedSalesforceSchema convertedSalesforceSchema = 
getConvertedSalesforceSchema(sObject, fields);
+        SalesforceSchemaHolder salesForceSchemaHolder = 
getConvertedSalesforceSchema(sObject, fields);
+
+        if (StringUtils.isBlank(fields)) {
+            fields = salesForceSchemaHolder.getSalesforceObject().getFields()
+                    .stream()
+                    .map(SObjectField::getName)
+                    .collect(Collectors.joining(","));
+        }
 
         String querySObject = buildQuery(
                 sObject,
@@ -324,7 +335,7 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
                         JsonTreeRowRecordReader jsonReader = new 
JsonTreeRowRecordReader(
                                 querySObjectResultInputStream,
                                 getLogger(),
-                                convertedSalesforceSchema.recordSchema,
+                                salesForceSchemaHolder.recordSchema,
                                 DATE_FORMAT,
                                 TIME_FORMAT,
                                 DATE_TIME_FORMAT,
@@ -338,7 +349,7 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
                                 getLogger(),
                                 writerFactory.getSchema(
                                         originalAttributes,
-                                        convertedSalesforceSchema.recordSchema
+                                        salesForceSchemaHolder.recordSchema
                                 ),
                                 out,
                                 originalAttributes
@@ -400,7 +411,7 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
         return salesforceRestService.getNextRecords(nextRecordsUrl.get());
     }
 
-    private ConvertedSalesforceSchema getConvertedSalesforceSchema(String 
sObject, String fields) {
+    private SalesforceSchemaHolder getConvertedSalesforceSchema(String 
sObject, String fields) {
         try (InputStream describeSObjectResult = 
salesforceRestService.describeSObject(sObject)) {
             return convertSchema(describeSObjectResult, fields);
         } catch (IOException e) {
@@ -416,9 +427,10 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
         }
     }
 
-    protected ConvertedSalesforceSchema convertSchema(InputStream 
describeSObjectResult, String fields) {
+    protected SalesforceSchemaHolder convertSchema(InputStream 
describeSObjectResult, String fieldsOfInterest) {
         try {
-            RecordSchema recordSchema = 
salesForceToRecordSchemaConverter.convertSchema(describeSObjectResult, fields);
+            SObjectDescription salesforceObject = 
salesForceToRecordSchemaConverter.getSalesforceObject(describeSObjectResult);
+            RecordSchema recordSchema = 
salesForceToRecordSchemaConverter.convertSchema(salesforceObject, 
fieldsOfInterest);
 
             RecordSchema querySObjectResultSchema = new 
SimpleRecordSchema(Collections.singletonList(
                     new RecordField(STARTING_FIELD_NAME, 
RecordFieldType.ARRAY.getArrayDataType(
@@ -428,7 +440,7 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
                     ))
             ));
 
-            return new ConvertedSalesforceSchema(querySObjectResultSchema, 
recordSchema);
+            return new SalesforceSchemaHolder(querySObjectResultSchema, 
recordSchema, salesforceObject);
         } catch (IOException e) {
             throw new ProcessException("SObject to Record schema conversion 
failed", e);
         }
@@ -471,13 +483,19 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
         return queryBuilder.toString();
     }
 
-    static class ConvertedSalesforceSchema {
+    static class SalesforceSchemaHolder {
         RecordSchema querySObjectResultSchema;
         RecordSchema recordSchema;
+        SObjectDescription salesforceObject;
 
-        public ConvertedSalesforceSchema(RecordSchema 
querySObjectResultSchema, RecordSchema recordSchema) {
+        public SalesforceSchemaHolder(RecordSchema querySObjectResultSchema, 
RecordSchema recordSchema, SObjectDescription salesforceObject) {
             this.querySObjectResultSchema = querySObjectResultSchema;
             this.recordSchema = recordSchema;
+            this.salesforceObject = salesforceObject;
+        }
+
+        public SObjectDescription getSalesforceObject() {
+            return salesforceObject;
         }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java
index f4396415ea..5bc6637161 100644
--- 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java
@@ -24,6 +24,7 @@ import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.StringUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -33,27 +34,30 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 public class SalesforceToRecordSchemaConverter {
-
+    private static final ObjectMapper OBJECT_MAPPER = 
JsonUtils.createObjectMapper();
     private final String dateFormat;
     private final String dateTimeFormat;
     private final String timeFormat;
-    private final ObjectMapper objectMapper;
 
     public SalesforceToRecordSchemaConverter(String dateFormat, String 
dateTimeFormat, String timeFormat) {
         this.dateFormat = dateFormat;
         this.dateTimeFormat = dateTimeFormat;
         this.timeFormat = timeFormat;
-        objectMapper = JsonUtils.createObjectMapper();
     }
 
-    public RecordSchema convertSchema(final InputStream 
describeSOjbectResultJsonString, final String fieldNamesOfInterest) throws 
IOException {
+    public SObjectDescription getSalesforceObject(InputStream 
salesforceObjectResultJsonString) throws IOException {
+        return OBJECT_MAPPER.readValue(salesforceObjectResultJsonString, 
SObjectDescription.class);
+    }
 
-        final SObjectDescription sObjectDescription = 
objectMapper.readValue(describeSOjbectResultJsonString, 
SObjectDescription.class);
-        final List<String> listOfFieldNamesOfInterest = 
Arrays.asList(fieldNamesOfInterest.toLowerCase().split("\\s*,\\s*"));
-        final List<SObjectField> fields = sObjectDescription.getFields()
-                .stream()
-                .filter(sObjectField -> 
listOfFieldNamesOfInterest.contains(sObjectField.getName().toLowerCase()))
-                .collect(Collectors.toList());
+    public RecordSchema convertSchema(SObjectDescription salesforceObject, 
String fieldNamesOfInterest) {
+        List<SObjectField> fields = salesforceObject.getFields();
+        if (StringUtils.isNotBlank(fieldNamesOfInterest)) {
+            final List<String> listOfFieldNamesOfInterest = 
Arrays.asList(fieldNamesOfInterest.toLowerCase().split("\\s*,\\s*"));
+            fields = fields
+                    .stream()
+                    .filter(sObjectField -> 
listOfFieldNamesOfInterest.contains(sObjectField.getName().toLowerCase()))
+                    .collect(Collectors.toList());
+        }
 
         final List<RecordField> recordFields = new ArrayList<>();
 
@@ -110,8 +114,8 @@ public class SalesforceToRecordSchemaConverter {
                     recordFields.add(new RecordField(field.getName(), 
RecordFieldType.RECORD.getRecordDataType(locationSchema), 
field.getDefaultValue(), field.isNillable()));
                     break;
                 default:
-                    throw new IllegalArgumentException(String.format("Could 
not create determine schema for '%s'. Could not convert field '%s' of soap type 
'%s'.",
-                            sObjectDescription.getName(), field.getName(), 
soapType));
+                    throw new IllegalArgumentException(String.format("Could 
not determine schema for '%s'. Could not convert field '%s' of soap type '%s'.",
+                            salesforceObject.getName(), field.getName(), 
soapType));
             }
         }
 
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java
index c0bc3a80c1..408bd62394 100644
--- 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.salesforce.util;
 
 import com.fasterxml.jackson.databind.exc.MismatchedInputException;
+import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
@@ -29,7 +30,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
-import java.util.Collections;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -69,7 +69,8 @@ class SalesforceToRecordSchemaConverterTest {
         ));
 
         try (final InputStream sfSchema = readFile(TEST_PATH + 
salesforceSchemaFileName)) {
-            final RecordSchema actual = converter.convertSchema(sfSchema, 
fieldNames);
+            final SObjectDescription salesforceObject = 
converter.getSalesforceObject(sfSchema);
+            final RecordSchema actual = 
converter.convertSchema(salesforceObject, fieldNames);
 
             assertEquals(expected, actual);
         }
@@ -102,7 +103,8 @@ class SalesforceToRecordSchemaConverterTest {
         ));
 
         try (final InputStream sfSchema = readFile(TEST_PATH + 
salesforceSchemaFileName)) {
-            final RecordSchema actual = converter.convertSchema(sfSchema, 
fieldNames);
+            final SObjectDescription salesforceObject = 
converter.getSalesforceObject(sfSchema);
+            final RecordSchema actual = 
converter.convertSchema(salesforceObject, fieldNames);
 
             assertEquals(expected, actual);
         }
@@ -119,21 +121,36 @@ class SalesforceToRecordSchemaConverterTest {
         ));
 
         try (final InputStream sfSchema = readFile(TEST_PATH + 
salesforceSchemaFileName)) {
-            final RecordSchema actual = converter.convertSchema(sfSchema, 
fieldNames);
+            final SObjectDescription salesforceObject = 
converter.getSalesforceObject(sfSchema);
+            final RecordSchema actual = 
converter.convertSchema(salesforceObject, fieldNames);
 
             assertEquals(expected, actual);
         }
     }
 
     @Test
-    void testSelectEmptyFields() throws IOException {
+    void testSelectAllFields() throws IOException {
         final String salesforceSchemaFileName = "simple_sf_schema.json";
         final String fieldNames = "";
 
-        final RecordSchema expected = new 
SimpleRecordSchema(Collections.emptyList());
+        final RecordSchema expected = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("ExampleInt", 
RecordFieldType.INT.getDataType()),
+                new RecordField("ExampleLong", 
RecordFieldType.LONG.getDataType()),
+                new RecordField("ExampleDouble", 
RecordFieldType.DOUBLE.getDataType()),
+                new RecordField("ExampleBoolean", 
RecordFieldType.BOOLEAN.getDataType()),
+                new RecordField("ExampleID", 
RecordFieldType.STRING.getDataType()),
+                new RecordField("ExampleString", 
RecordFieldType.STRING.getDataType()),
+                new RecordField("ExampleJson", 
RecordFieldType.STRING.getDataType()),
+                new RecordField("ExampleBase64Binary", 
RecordFieldType.STRING.getDataType()),
+                new RecordField("ExampleAnyType", 
RecordFieldType.STRING.getDataType()),
+                new RecordField("ExampleDate", 
RecordFieldType.DATE.getDataType("yyyy-mm-dd")),
+                new RecordField("ExampleDateTime", 
RecordFieldType.TIMESTAMP.getDataType("yyyy-mm-dd / hh:mm:ss")),
+                new RecordField("ExampleTime", 
RecordFieldType.TIME.getDataType("hh:mm:ss"))
+        ));
 
         try (final InputStream sfSchema = readFile(TEST_PATH + 
salesforceSchemaFileName)) {
-            final RecordSchema actual = converter.convertSchema(sfSchema, 
fieldNames);
+            final SObjectDescription salesforceObject = 
converter.getSalesforceObject(sfSchema);
+            final RecordSchema actual = 
converter.convertSchema(salesforceObject, fieldNames);
 
             assertEquals(expected, actual);
         }
@@ -142,22 +159,23 @@ class SalesforceToRecordSchemaConverterTest {
     @Test
     void testConvertEmptySchema() throws IOException {
         try (final InputStream sfSchema = new 
ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8))) {
-            assertThrows(MismatchedInputException.class, () -> 
converter.convertSchema(sfSchema, "ExampleField"));
+            assertThrows(MismatchedInputException.class, () -> 
converter.getSalesforceObject(sfSchema));
         }
     }
 
     @Test
-    void testConvertNullSchema() {
+    void testConvertNullSchema() throws IOException {
         final InputStream sfSchema = null;
-        assertThrows(IllegalArgumentException.class, () -> 
converter.convertSchema(sfSchema, "ExampleField"));
+        assertThrows(IllegalArgumentException.class, () -> 
converter.getSalesforceObject(sfSchema));
     }
 
     @Test
     void testConvertUnknownDataType() throws IOException {
         try (final InputStream sfSchema = readFile(TEST_PATH + 
"unknown_type_sf_schema.json")) {
             final String fieldNames = "FieldWithUnknownType";
-            final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
converter.convertSchema(sfSchema, fieldNames));
-            final String errorMessage = "Could not create determine schema for 
'SObjectWithUnknownFieldType'. Could not convert field 'FieldWithUnknownType' 
of soap type 'xsd:unknown'.";
+            final SObjectDescription salesforceObject = 
converter.getSalesforceObject(sfSchema);
+            final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
converter.convertSchema(salesforceObject, fieldNames));
+            final String errorMessage = "Could not determine schema for 
'SObjectWithUnknownFieldType'. Could not convert field 'FieldWithUnknownType' 
of soap type 'xsd:unknown'.";
             assertEquals(errorMessage, exception.getMessage());
         }
     }
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java
index a55f10b8c1..bd48c5b5b6 100644
--- 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java
@@ -94,15 +94,15 @@ class TestRecordExtender {
         int referenceId = 0;
         String objectType = "Account";
 
-        MapRecord testRecord = new MapRecord(ORIGINAL_SCHEMA, new 
HashMap<String, Object>() {{
+        MapRecord testRecord = new MapRecord(ORIGINAL_SCHEMA, new HashMap<>() 
{{
             put("testRecordField1", "testRecordValue1");
             put("testRecordField2", "testRecordValue2");
         }});
 
 
-        MapRecord expectedRecord = new MapRecord(EXPECTED_EXTENDED_SCHEMA, new 
HashMap<String, Object>() {{
+        MapRecord expectedRecord = new MapRecord(EXPECTED_EXTENDED_SCHEMA, new 
HashMap<>() {{
             put("attributes",
-                    new MapRecord(ATTRIBUTES_RECORD_SCHEMA, new 
HashMap<String, Object>() {{
+                    new MapRecord(ATTRIBUTES_RECORD_SCHEMA, new HashMap<>() {{
                         put("type", objectType);
                         put("referenceId", referenceId);
                     }})

Reply via email to