[
https://issues.apache.org/jira/browse/NIFI-4004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16063213#comment-16063213
]
ASF GitHub Bot commented on NIFI-4004:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1877#discussion_r124033334
--- Diff:
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
---
@@ -114,46 +116,35 @@ private static boolean isSchemaRegistryRequired(final
String schemaAccessValue)
||
HWX_SCHEMA_REF_ATTRIBUTES.getValue().equalsIgnoreCase(schemaAccessValue);
}
- public static SchemaAccessStrategy getSchemaAccessStrategy(final
String allowableValue, final SchemaRegistry schemaRegistry, final
ProcessContext context) {
- if
(allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
- return new SchemaNamePropertyStrategy(schemaRegistry,
context.getProperty(SCHEMA_NAME));
- } else if
(allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
- return new
AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
- } else if
(allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
- return new
HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
- } else if
(allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
- return new
HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
- }
-
- return null;
+ public static boolean isFlowFileRequired(final String
schemaAccessValue) {
+ return
HWX_SCHEMA_REF_ATTRIBUTES.getValue().equalsIgnoreCase(schemaAccessValue);
}
- public static SchemaAccessStrategy getSchemaAccessStrategy(final
String allowableValue, final SchemaRegistry schemaRegistry, final
ConfigurationContext context) {
- if
(allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
- return new SchemaNamePropertyStrategy(schemaRegistry,
context.getProperty(SCHEMA_NAME));
- } else if
(allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
- return new
AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
- } else if
(allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
+ private static SchemaAccessStrategy getSchemaAccessStrategy(
+ final String strategy, final SchemaRegistry schemaRegistry,
final Function<PropertyDescriptor, PropertyValue> getProperty) {
+ if (strategy.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
+ return new SchemaNamePropertyStrategy(schemaRegistry,
getProperty.apply(SCHEMA_NAME));
+ } else if
(strategy.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
+ return new
AvroSchemaTextStrategy(getProperty.apply(SCHEMA_TEXT));
+ } else if
(strategy.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
return new
HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
- } else if
(allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
+ } else if
(strategy.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
return new
HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
}
return null;
}
- public static SchemaAccessStrategy getSchemaAccessStrategy(final
String allowableValue, final SchemaRegistry schemaRegistry, final
ValidationContext context) {
- if
(allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
- return new SchemaNamePropertyStrategy(schemaRegistry,
context.getProperty(SCHEMA_NAME));
- } else if
(allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
- return new
AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
- } else if
(allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
- return new
HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
- } else if
(allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
- return new
HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
- }
+ public static SchemaAccessStrategy getSchemaAccessStrategy(final
String strategy, final SchemaRegistry schemaRegistry, final ProcessContext
context) {
+ return getSchemaAccessStrategy(strategy, schemaRegistry,
context::getProperty);
--- End diff --
This is a good approach. However, on master, there is already a change that
updates this to take just a PropertyContext, so I think this needs to just be
rebased.
> Refactor RecordReaderFactory and SchemaAccessStrategy to be used without
> incoming FlowFile
> ------------------------------------------------------------------------------------------
>
> Key: NIFI-4004
> URL: https://issues.apache.org/jira/browse/NIFI-4004
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Affects Versions: 1.2.0
> Reporter: Koji Kawamura
> Assignee: Koji Kawamura
>
> Current RecordReaderFactory and SchemaAccessStrategy implementation assumes
> there's always an incoming FlowFile available, and use it to resolve Record
> Schema.
> That is fine for components those convert or update incoming FlowFiles,
> however there are other components those does not have any incoming
> FlowFiles, for example, ConsumeKafkaRecord_0_10. Typically, ones fetches data
> from external system do not have incoming FlowFile. And current API doesn't
> fit well with these as it requires a FlowFile.
> In fact, [ConsumeKafkaRecord creates a temporal
> FlowFile|https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java#L426]
> only to get RecordSchema. This should be avoided as we expect more
> components start using Record reader mechanism.
> This JIRA proposes refactoring current API to allow accessing RecordReaders
> without needing an incoming FlowFile.
> Additionally, since there's Schema Access Strategy that requires incoming
> FlowFile containing attribute values to access schema registry, it'd be
> useful if we could tell user when such RecordReader is specified that it
> can't be used.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)