exceptionfactory commented on code in PR #8005: URL: https://github.com/apache/nifi/pull/8005#discussion_r1416003586
########## nifi-commons/nifi-json-schema-api/src/main/java/org/apache/nifi/schema/access/JsonSchema.java: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schema.access; + +import java.util.Objects; + +public class JsonSchema { + private final SchemaVersion schemaDraftVersion; Review Comment: Recommend renaming this to just `schemaVersion` removing `draft` from the name. ```suggestion private final SchemaVersion schemaVersion; ``` ########## nifi-commons/nifi-json-schema-api/src/main/java/org/apache/nifi/schema/access/JsonSchema.java: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schema.access; + +import java.util.Objects; + +public class JsonSchema { + private final SchemaVersion schemaDraftVersion; + private final String schemaText; + + public JsonSchema(SchemaVersion schemaDraftVersion, String schemaText) { + Objects.requireNonNull(schemaDraftVersion, "Schema draft version cannot be null"); + Objects.requireNonNull(schemaText, "The text of the schema cannot be null"); + this.schemaDraftVersion = schemaDraftVersion; + this.schemaText = schemaText; + } + + public SchemaVersion getSchemaDraftVersion() { Review Comment: ```suggestion public SchemaVersion getSchemaVersion() { ``` ########## nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import com.networknt.schema.JsonSchemaFactory; +import com.networknt.schema.SpecVersion; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.JsonSchema; +import org.apache.nifi.schema.access.JsonSchemaRegistryComponent; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.access.SchemaVersion; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +@Tags({"schema", "registry", "json"}) +@CapabilityDescription("Provides a service for registering and accessing schemas. One can register a schema " + + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual " + + "representation of the actual schema following the syntax and semantics of JSON's Schema format. " + + "Empty schemas and schemas only consisting of whitespace are not acceptable schemas." + + "The registry is heterogeneous registry as it can store schemas of multiple schema draft versions. " + + "By default the registry is configured to store schemas of Draft 2020-12. When a schema is added, the version " + + "which is currently is set, is what the schema is saved as.") +@DynamicProperty(name = "Schema name", value = "Schema Content", + description = "Adds a named schema using the JSON string representation of a JSON schema", + expressionLanguageScope = ExpressionLanguageScope.NONE) +public class InMemoryJsonSchemaRegistry extends AbstractControllerService implements JsonSchemaRegistry, JsonSchemaRegistryComponent { + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.singletonList(SCHEMA_VERSION); + + private final ConcurrentMap<String, JsonSchema> jsonSchemas; + private final ConcurrentMap<SchemaVersion, JsonSchemaFactory> schemaFactories; + private volatile SchemaVersion schemaVersion; + + public InMemoryJsonSchemaRegistry() { + jsonSchemas = new ConcurrentHashMap<>(); + schemaFactories = Arrays.stream(SchemaVersion.values()) + .collect(Collectors.toConcurrentMap(Function.identity(), + schemaDraftVersion -> JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.fromId(schemaDraftVersion.getUri()).get()))); + schemaVersion = SchemaVersion.valueOf(SCHEMA_VERSION.getDefaultValue()); + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (SCHEMA_VERSION.getName().equals(descriptor.getName()) && !newValue.equals(oldValue)) { + schemaVersion = SchemaVersion.valueOf(newValue); + } else if(descriptor.isDynamic() && isBlank(newValue)) { + jsonSchemas.remove(descriptor.getName()); + } else if (descriptor.isDynamic() && isNotBlank(newValue)) { + try { + final String schemaName = descriptor.getName(); + final JsonSchemaFactory jsonSchemaFactory = schemaFactories.get(schemaVersion); + jsonSchemaFactory.getSchema(newValue); + jsonSchemas.put(schemaName, new JsonSchema(schemaVersion, newValue)); + } catch (final Exception e) { + // not a problem - the service won't be valid and the validation message will indicate what is wrong. Review Comment: Perhaps log this at the debug level for potential troubleshooting. ########## nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import com.networknt.schema.JsonSchemaFactory; +import com.networknt.schema.SpecVersion; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.JsonSchema; +import org.apache.nifi.schema.access.JsonSchemaRegistryComponent; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.access.SchemaVersion; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +@Tags({"schema", "registry", "json"}) +@CapabilityDescription("Provides a service for registering and accessing schemas. One can register a schema " + + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual " + + "representation of the actual schema following the syntax and semantics of JSON's Schema format. " + + "Empty schemas and schemas only consisting of whitespace are not acceptable schemas." + + "The registry is heterogeneous registry as it can store schemas of multiple schema draft versions. " + + "By default the registry is configured to store schemas of Draft 2020-12. When a schema is added, the version " + + "which is currently is set, is what the schema is saved as.") +@DynamicProperty(name = "Schema name", value = "Schema Content", Review Comment: ```suggestion @DynamicProperty(name = "Schema Name", value = "Schema Content", ``` ########## nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import com.networknt.schema.JsonSchemaFactory; +import com.networknt.schema.SpecVersion; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.JsonSchema; +import org.apache.nifi.schema.access.JsonSchemaRegistryComponent; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.access.SchemaVersion; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +@Tags({"schema", "registry", "json"}) +@CapabilityDescription("Provides a service for registering and accessing schemas. One can register a schema " + + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual " + + "representation of the actual schema following the syntax and semantics of JSON's Schema format. " + + "Empty schemas and schemas only consisting of whitespace are not acceptable schemas." + + "The registry is heterogeneous registry as it can store schemas of multiple schema draft versions. " + + "By default the registry is configured to store schemas of Draft 2020-12. When a schema is added, the version " + + "which is currently is set, is what the schema is saved as.") +@DynamicProperty(name = "Schema name", value = "Schema Content", + description = "Adds a named schema using the JSON string representation of a JSON schema", + expressionLanguageScope = ExpressionLanguageScope.NONE) +public class InMemoryJsonSchemaRegistry extends AbstractControllerService implements JsonSchemaRegistry, JsonSchemaRegistryComponent { + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.singletonList(SCHEMA_VERSION); + + private final ConcurrentMap<String, JsonSchema> jsonSchemas; + private final ConcurrentMap<SchemaVersion, JsonSchemaFactory> schemaFactories; + private volatile SchemaVersion schemaVersion; + + public InMemoryJsonSchemaRegistry() { + jsonSchemas = new ConcurrentHashMap<>(); + schemaFactories = Arrays.stream(SchemaVersion.values()) + .collect(Collectors.toConcurrentMap(Function.identity(), + schemaDraftVersion -> JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.fromId(schemaDraftVersion.getUri()).get()))); + schemaVersion = SchemaVersion.valueOf(SCHEMA_VERSION.getDefaultValue()); + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (SCHEMA_VERSION.getName().equals(descriptor.getName()) && !newValue.equals(oldValue)) { + schemaVersion = SchemaVersion.valueOf(newValue); + } else if(descriptor.isDynamic() && isBlank(newValue)) { + jsonSchemas.remove(descriptor.getName()); + } else if (descriptor.isDynamic() && isNotBlank(newValue)) { + try { + final String schemaName = descriptor.getName(); + final JsonSchemaFactory jsonSchemaFactory = schemaFactories.get(schemaVersion); + jsonSchemaFactory.getSchema(newValue); + jsonSchemas.put(schemaName, new JsonSchema(schemaVersion, newValue)); + } catch (final Exception e) { + // not a problem - the service won't be valid and the validation message will indicate what is wrong. + } + } + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final Set<ValidationResult> results = new HashSet<>(); + + final boolean noEnteredSchema = validationContext.getProperties().keySet().stream() + .noneMatch(PropertyDescriptor::isDynamic); + if(noEnteredSchema) { + results.add(new ValidationResult.Builder() + .subject("Supported Dynamic Property Descriptor") + .valid(false) + .explanation("There must be at least one JSON schema specified") + .build()); + } else { + // Iterate over dynamic properties, validating only newly added schemas, and adding results + schemaVersion = SchemaVersion.valueOf(validationContext.getProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION).getValue()); + validationContext.getProperties().entrySet().stream() + .filter(entry -> entry.getKey().isDynamic() && !jsonSchemas.containsKey(entry.getKey().getName())) + .forEach(entry -> { + String subject = entry.getKey().getName(); + String input = entry.getValue(); + if (isNotBlank(input)) { + try { + final JsonSchemaFactory jsonSchemaFactory = schemaFactories.get(schemaVersion); + jsonSchemaFactory.getSchema(input); + } catch (Exception e) { + results.add(new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(false) + .explanation("Not a valid JSON Schema: " + e.getMessage()) + .build()); + } + } + }); + } + + return results; + } + + @Override + public JsonSchema retrieveSchema(final String schemaName) throws SchemaNotFoundException { + if(!jsonSchemas.containsKey(schemaName)) { + throw new SchemaNotFoundException("Unable to find schema with name '" + schemaName + "'"); + } + return jsonSchemas.get(schemaName); Review Comment: It seems like this could be simplified to call `jsonSchemas.get(schemaName)`, and if the return value is `null`, throw the exception. It is minor, but makes it simpler with just one call to the Map. ########## nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import com.networknt.schema.JsonSchemaFactory; +import com.networknt.schema.SpecVersion; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.JsonSchema; +import org.apache.nifi.schema.access.JsonSchemaRegistryComponent; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.access.SchemaVersion; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +@Tags({"schema", "registry", "json"}) +@CapabilityDescription("Provides a service for registering and accessing schemas. One can register a schema " + + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual " + + "representation of the actual schema following the syntax and semantics of JSON's Schema format. " + + "Empty schemas and schemas only consisting of whitespace are not acceptable schemas." + + "The registry is heterogeneous registry as it can store schemas of multiple schema draft versions. " + + "By default the registry is configured to store schemas of Draft 2020-12. When a schema is added, the version " + + "which is currently is set, is what the schema is saved as.") +@DynamicProperty(name = "Schema name", value = "Schema Content", + description = "Adds a named schema using the JSON string representation of a JSON schema", + expressionLanguageScope = ExpressionLanguageScope.NONE) +public class InMemoryJsonSchemaRegistry extends AbstractControllerService implements JsonSchemaRegistry, JsonSchemaRegistryComponent { Review Comment: It seems like a different name would be better for this implementation. The JSON Schema definitions are available in memory, but they persist in the flow configuration based on properties. One option might be `DynamicPropertyJsonSchemaRegistry`, but that may not be optimal. Perhaps `PropertySourceJsonSchemaRegistry`, `PropertyValueJsonSchemaRegistry`, or `PropertyConfiguredJsonSchemaRegistry`? Any other ideas? ########## nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import com.networknt.schema.JsonSchemaFactory; +import com.networknt.schema.SpecVersion; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.JsonSchema; +import org.apache.nifi.schema.access.JsonSchemaRegistryComponent; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.access.SchemaVersion; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +@Tags({"schema", "registry", "json"}) +@CapabilityDescription("Provides a service for registering and accessing schemas. One can register a schema " + + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual " + + "representation of the actual schema following the syntax and semantics of JSON's Schema format. " + + "Empty schemas and schemas only consisting of whitespace are not acceptable schemas." + + "The registry is heterogeneous registry as it can store schemas of multiple schema draft versions. " Review Comment: ```suggestion + "The registry is heterogeneous as it can store schemas of different schema draft versions. " ``` ########## nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml: ########## @@ -52,5 +57,15 @@ language governing permissions and limitations under the License. --> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency> + <dependency> + <groupId>com.networknt</groupId> + <artifactId>json-schema-validator</artifactId> + <version>1.0.86</version> Review Comment: The latest version is now 1.0.87. ```suggestion <version>1.0.87</version> ``` ########## nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import com.networknt.schema.JsonSchemaFactory; +import com.networknt.schema.SpecVersion; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.JsonSchema; +import org.apache.nifi.schema.access.JsonSchemaRegistryComponent; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.access.SchemaVersion; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +@Tags({"schema", "registry", "json"}) +@CapabilityDescription("Provides a service for registering and accessing schemas. One can register a schema " + + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual " + + "representation of the actual schema following the syntax and semantics of JSON's Schema format. " + + "Empty schemas and schemas only consisting of whitespace are not acceptable schemas." + + "The registry is heterogeneous registry as it can store schemas of multiple schema draft versions. " + + "By default the registry is configured to store schemas of Draft 2020-12. When a schema is added, the version " + + "which is currently is set, is what the schema is saved as.") +@DynamicProperty(name = "Schema name", value = "Schema Content", + description = "Adds a named schema using the JSON string representation of a JSON schema", + expressionLanguageScope = ExpressionLanguageScope.NONE) +public class InMemoryJsonSchemaRegistry extends AbstractControllerService implements JsonSchemaRegistry, JsonSchemaRegistryComponent { + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.singletonList(SCHEMA_VERSION); + + private final ConcurrentMap<String, JsonSchema> jsonSchemas; + private final ConcurrentMap<SchemaVersion, JsonSchemaFactory> schemaFactories; + private volatile SchemaVersion schemaVersion; + + public InMemoryJsonSchemaRegistry() { + jsonSchemas = new ConcurrentHashMap<>(); + schemaFactories = Arrays.stream(SchemaVersion.values()) + .collect(Collectors.toConcurrentMap(Function.identity(), + schemaDraftVersion -> JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.fromId(schemaDraftVersion.getUri()).get()))); + schemaVersion = SchemaVersion.valueOf(SCHEMA_VERSION.getDefaultValue()); + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (SCHEMA_VERSION.getName().equals(descriptor.getName()) && !newValue.equals(oldValue)) { + schemaVersion = SchemaVersion.valueOf(newValue); + } else if(descriptor.isDynamic() && isBlank(newValue)) { + jsonSchemas.remove(descriptor.getName()); + } else if (descriptor.isDynamic() && isNotBlank(newValue)) { + try { + final String schemaName = descriptor.getName(); + final JsonSchemaFactory jsonSchemaFactory = schemaFactories.get(schemaVersion); + jsonSchemaFactory.getSchema(newValue); + jsonSchemas.put(schemaName, new JsonSchema(schemaVersion, newValue)); + } catch (final Exception e) { + // not a problem - the service won't be valid and the validation message will indicate what is wrong. + } + } + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final Set<ValidationResult> results = new HashSet<>(); + + final boolean noEnteredSchema = validationContext.getProperties().keySet().stream() + .noneMatch(PropertyDescriptor::isDynamic); + if(noEnteredSchema) { Review Comment: Spacing: ```suggestion if (noEnteredSchema) { ``` ########## nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java: ########## @@ -221,4 +292,28 @@ private static String getFileContent(final String filename) { private static String getRelativeResourcePath(final String filename) { return String.format("/%s/%s", TestValidateJson.class.getSimpleName(), filename); } + + private static class SampleJsonSchemaRegistry extends AbstractControllerService implements JsonSchemaRegistry { + private final String identifier; + private final String schemaName; + + public SampleJsonSchemaRegistry(String identifier, String schemaName) { + this.identifier = identifier; + this.schemaName = schemaName; + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public JsonSchema retrieveSchema(String schemaName) throws SchemaNotFoundException { + if(StringUtils.isNotBlank(schemaName) && this.schemaName.equals(schemaName)) { Review Comment: It looks like the `isNotBlank()` check is unnecessary, since `this.schemaName` is always defined. ```suggestion if (this.schemaName.equals(schemaName)) { ``` ########## nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import com.networknt.schema.JsonSchemaFactory; +import com.networknt.schema.SpecVersion; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.JsonSchema; +import org.apache.nifi.schema.access.JsonSchemaRegistryComponent; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.access.SchemaVersion; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +@Tags({"schema", "registry", "json"}) +@CapabilityDescription("Provides a service for registering and accessing schemas. One can register a schema " + + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual " + + "representation of the actual schema following the syntax and semantics of JSON's Schema format. " Review Comment: ```suggestion + "representation of the actual schema following the syntax and semantics of the JSON Schema format. " ``` ########## nifi-commons/nifi-json-schema-shared/src/main/java/org/apache/nifi/schema/access/JsonSchemaRegistryComponent.java: ########## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schema.access; + +import org.apache.nifi.components.PropertyDescriptor; + +public interface JsonSchemaRegistryComponent { + PropertyDescriptor SCHEMA_VERSION = new PropertyDescriptor + .Builder().name("Schema Version") + .displayName("JSON Draft Schema Version") Review Comment: The name and displayName should match. ```suggestion .Builder() .name("JSON Schema Version") .displayName("JSON Schema Version") ``` ########## nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import com.networknt.schema.JsonSchemaFactory; +import com.networknt.schema.SpecVersion; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.JsonSchema; +import org.apache.nifi.schema.access.JsonSchemaRegistryComponent; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.access.SchemaVersion; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +@Tags({"schema", "registry", "json"}) +@CapabilityDescription("Provides a service for registering and accessing schemas. One can register a schema " + + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual " + + "representation of the actual schema following the syntax and semantics of JSON's Schema format. " + + "Empty schemas and schemas only consisting of whitespace are not acceptable schemas." + + "The registry is heterogeneous registry as it can store schemas of multiple schema draft versions. " + + "By default the registry is configured to store schemas of Draft 2020-12. When a schema is added, the version " + + "which is currently is set, is what the schema is saved as.") +@DynamicProperty(name = "Schema name", value = "Schema Content", + description = "Adds a named schema using the JSON string representation of a JSON schema", + expressionLanguageScope = ExpressionLanguageScope.NONE) +public class InMemoryJsonSchemaRegistry extends AbstractControllerService implements JsonSchemaRegistry, JsonSchemaRegistryComponent { + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.singletonList(SCHEMA_VERSION); + + private final ConcurrentMap<String, JsonSchema> jsonSchemas; + private final ConcurrentMap<SchemaVersion, JsonSchemaFactory> schemaFactories; + private volatile SchemaVersion schemaVersion; + + public InMemoryJsonSchemaRegistry() { + jsonSchemas = new ConcurrentHashMap<>(); + schemaFactories = Arrays.stream(SchemaVersion.values()) + .collect(Collectors.toConcurrentMap(Function.identity(), + schemaDraftVersion -> JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.fromId(schemaDraftVersion.getUri()).get()))); + schemaVersion = SchemaVersion.valueOf(SCHEMA_VERSION.getDefaultValue()); + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (SCHEMA_VERSION.getName().equals(descriptor.getName()) && !newValue.equals(oldValue)) { + schemaVersion = SchemaVersion.valueOf(newValue); + } else if(descriptor.isDynamic() && isBlank(newValue)) { + jsonSchemas.remove(descriptor.getName()); + } else if (descriptor.isDynamic() && isNotBlank(newValue)) { + try { + final String schemaName = descriptor.getName(); + final JsonSchemaFactory jsonSchemaFactory = schemaFactories.get(schemaVersion); + jsonSchemaFactory.getSchema(newValue); + jsonSchemas.put(schemaName, new JsonSchema(schemaVersion, newValue)); + } catch (final Exception e) { + // not a problem - the service won't be valid and the validation message will indicate what is wrong. + } + } + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final Set<ValidationResult> results = new HashSet<>(); + + final boolean noEnteredSchema = validationContext.getProperties().keySet().stream() Review Comment: Perhaps name this `noSchemasConfigured`? ########## nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import com.networknt.schema.JsonSchemaFactory; +import com.networknt.schema.SpecVersion; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.JsonSchema; +import org.apache.nifi.schema.access.JsonSchemaRegistryComponent; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.access.SchemaVersion; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +@Tags({"schema", "registry", "json"}) +@CapabilityDescription("Provides a service for registering and accessing schemas. One can register a schema " Review Comment: ```suggestion @CapabilityDescription("Provides a service for registering and accessing JSON schemas. One can register a schema " ``` ########## nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import com.networknt.schema.JsonSchemaFactory; +import com.networknt.schema.SpecVersion; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.JsonSchema; +import org.apache.nifi.schema.access.JsonSchemaRegistryComponent; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.access.SchemaVersion; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +@Tags({"schema", "registry", "json"}) +@CapabilityDescription("Provides a service for registering and accessing schemas. One can register a schema " + + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual " + + "representation of the actual schema following the syntax and semantics of JSON's Schema format. " + + "Empty schemas and schemas only consisting of whitespace are not acceptable schemas." + + "The registry is heterogeneous registry as it can store schemas of multiple schema draft versions. " + + "By default the registry is configured to store schemas of Draft 2020-12. When a schema is added, the version " + + "which is currently is set, is what the schema is saved as.") +@DynamicProperty(name = "Schema name", value = "Schema Content", + description = "Adds a named schema using the JSON string representation of a JSON schema", + expressionLanguageScope = ExpressionLanguageScope.NONE) +public class InMemoryJsonSchemaRegistry extends AbstractControllerService implements JsonSchemaRegistry, JsonSchemaRegistryComponent { + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.singletonList(SCHEMA_VERSION); + + private final ConcurrentMap<String, JsonSchema> jsonSchemas; + private final ConcurrentMap<SchemaVersion, JsonSchemaFactory> schemaFactories; + private volatile SchemaVersion schemaVersion; + + public InMemoryJsonSchemaRegistry() { + jsonSchemas = new ConcurrentHashMap<>(); + schemaFactories = Arrays.stream(SchemaVersion.values()) + .collect(Collectors.toConcurrentMap(Function.identity(), + schemaDraftVersion -> JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.fromId(schemaDraftVersion.getUri()).get()))); + schemaVersion = SchemaVersion.valueOf(SCHEMA_VERSION.getDefaultValue()); + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (SCHEMA_VERSION.getName().equals(descriptor.getName()) && !newValue.equals(oldValue)) { + schemaVersion = SchemaVersion.valueOf(newValue); + } else if(descriptor.isDynamic() && isBlank(newValue)) { + jsonSchemas.remove(descriptor.getName()); + } else if (descriptor.isDynamic() && isNotBlank(newValue)) { + try { + final String schemaName = descriptor.getName(); + final JsonSchemaFactory jsonSchemaFactory = schemaFactories.get(schemaVersion); + jsonSchemaFactory.getSchema(newValue); + jsonSchemas.put(schemaName, new JsonSchema(schemaVersion, newValue)); + } catch (final Exception e) { + // not a problem - the service won't be valid and the validation message will indicate what is wrong. + } + } + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final Set<ValidationResult> results = new HashSet<>(); + + final boolean noEnteredSchema = validationContext.getProperties().keySet().stream() + .noneMatch(PropertyDescriptor::isDynamic); + if(noEnteredSchema) { + results.add(new ValidationResult.Builder() + .subject("Supported Dynamic Property Descriptor") + .valid(false) + .explanation("There must be at least one JSON schema specified") + .build()); + } else { + // Iterate over dynamic properties, validating only newly added schemas, and adding results + schemaVersion = SchemaVersion.valueOf(validationContext.getProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION).getValue()); + validationContext.getProperties().entrySet().stream() + .filter(entry -> entry.getKey().isDynamic() && !jsonSchemas.containsKey(entry.getKey().getName())) + .forEach(entry -> { + String subject = entry.getKey().getName(); + String input = entry.getValue(); + if (isNotBlank(input)) { + try { + final JsonSchemaFactory jsonSchemaFactory = schemaFactories.get(schemaVersion); + jsonSchemaFactory.getSchema(input); + } catch (Exception e) { + results.add(new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(false) + .explanation("Not a valid JSON Schema: " + e.getMessage()) + .build()); + } + } + }); + } + + return results; + } + + @Override + public JsonSchema retrieveSchema(final String schemaName) throws SchemaNotFoundException { + if(!jsonSchemas.containsKey(schemaName)) { Review Comment: ```suggestion if (!jsonSchemas.containsKey(schemaName)) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
