[ 
https://issues.apache.org/jira/browse/NIFI-4004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16063213#comment-16063213
 ] 

ASF GitHub Bot commented on NIFI-4004:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1877#discussion_r124033334
  
    --- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
 ---
    @@ -114,46 +116,35 @@ private static boolean isSchemaRegistryRequired(final 
String schemaAccessValue)
                     || 
HWX_SCHEMA_REF_ATTRIBUTES.getValue().equalsIgnoreCase(schemaAccessValue);
         }
     
    -    public static SchemaAccessStrategy getSchemaAccessStrategy(final 
String allowableValue, final SchemaRegistry schemaRegistry, final 
ProcessContext context) {
    -        if 
(allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
    -            return new SchemaNamePropertyStrategy(schemaRegistry, 
context.getProperty(SCHEMA_NAME));
    -        } else if 
(allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
    -            return new 
AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
    -        } else if 
(allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
    -            return new 
HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
    -        } else if 
(allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
    -            return new 
HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
    -        }
    -
    -        return null;
    +    public static boolean isFlowFileRequired(final String 
schemaAccessValue) {
    +        return 
HWX_SCHEMA_REF_ATTRIBUTES.getValue().equalsIgnoreCase(schemaAccessValue);
         }
     
    -    public static SchemaAccessStrategy getSchemaAccessStrategy(final 
String allowableValue, final SchemaRegistry schemaRegistry, final 
ConfigurationContext context) {
    -        if 
(allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
    -            return new SchemaNamePropertyStrategy(schemaRegistry, 
context.getProperty(SCHEMA_NAME));
    -        } else if 
(allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
    -            return new 
AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
    -        } else if 
(allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
    +    private static SchemaAccessStrategy getSchemaAccessStrategy(
    +            final String strategy, final SchemaRegistry schemaRegistry, 
final Function<PropertyDescriptor, PropertyValue> getProperty) {
    +        if (strategy.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
    +            return new SchemaNamePropertyStrategy(schemaRegistry, 
getProperty.apply(SCHEMA_NAME));
    +        } else if 
(strategy.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
    +            return new 
AvroSchemaTextStrategy(getProperty.apply(SCHEMA_TEXT));
    +        } else if 
(strategy.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
                 return new 
HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
    -        } else if 
(allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
    +        } else if 
(strategy.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
                 return new 
HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
             }
     
             return null;
         }
     
    -    public static SchemaAccessStrategy getSchemaAccessStrategy(final 
String allowableValue, final SchemaRegistry schemaRegistry, final 
ValidationContext context) {
    -        if 
(allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
    -            return new SchemaNamePropertyStrategy(schemaRegistry, 
context.getProperty(SCHEMA_NAME));
    -        } else if 
(allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
    -            return new 
AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
    -        } else if 
(allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
    -            return new 
HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
    -        } else if 
(allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
    -            return new 
HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
    -        }
    +    public static SchemaAccessStrategy getSchemaAccessStrategy(final 
String strategy, final SchemaRegistry schemaRegistry, final ProcessContext 
context) {
    +        return getSchemaAccessStrategy(strategy, schemaRegistry, 
context::getProperty);
    --- End diff --
    
    This is a good approach. However, on master, there is already a change that 
updates this to take just a PropertyContext, so I think this needs to just be 
rebased.


> Refactor RecordReaderFactory and SchemaAccessStrategy to be used without 
> incoming FlowFile
> ------------------------------------------------------------------------------------------
>
>                 Key: NIFI-4004
>                 URL: https://issues.apache.org/jira/browse/NIFI-4004
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>    Affects Versions: 1.2.0
>            Reporter: Koji Kawamura
>            Assignee: Koji Kawamura
>
> Current RecordReaderFactory and SchemaAccessStrategy implementation assumes 
> there's always an incoming FlowFile available, and use it to resolve Record 
> Schema.
> That is fine for components those convert or update incoming FlowFiles, 
> however there are other components those does not have any incoming 
> FlowFiles, for example, ConsumeKafkaRecord_0_10. Typically, ones fetches data 
> from external system do not have incoming FlowFile. And current API doesn't 
> fit well with these as it requires a FlowFile.
> In fact, [ConsumeKafkaRecord creates a temporal 
> FlowFile|https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java#L426]
>  only to get RecordSchema. This should be avoided as we expect more 
> components start using Record reader mechanism.
> This JIRA proposes refactoring current API to allow accessing RecordReaders 
> without needing an incoming FlowFile.
> Additionally, since there's Schema Access Strategy that requires incoming 
> FlowFile containing attribute values to access schema registry, it'd be 
> useful if we could tell user when such RecordReader is specified that it 
> can't be used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to