Repository: nifi
Updated Branches:
  refs/heads/master 4e4aa54c6 -> 159b64b4c


NIFI-5123: Move SchemaRegistryService to nifi-avro-record-utils

This closes #2661.

Signed-off-by: Andy LoPresto <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/159b64b4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/159b64b4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/159b64b4

Branch: refs/heads/master
Commit: 159b64b4c8bb1b32f6ec9ddba3b98e0faa82c72a
Parents: 4e4aa54
Author: Matthew Burgess <[email protected]>
Authored: Thu Apr 26 10:19:41 2018 -0400
Committer: Andy LoPresto <[email protected]>
Committed: Thu Apr 26 11:50:01 2018 -0400

----------------------------------------------------------------------
 .../serialization/SchemaRegistryService.java    | 165 +++++++++++++++++++
 .../serialization/SchemaRegistryService.java    | 165 -------------------
 2 files changed, 165 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/159b64b4/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
new file mode 100644
index 0000000..b299191
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.CONFLUENT_ENCODED_SCHEMA;
+
+public abstract class SchemaRegistryService extends AbstractControllerService {
+
+    private volatile ConfigurationContext configurationContext;
+    private volatile SchemaAccessStrategy schemaAccessStrategy;
+    private static InputStream EMPTY_INPUT_STREAM = new 
ByteArrayInputStream(new byte[0]);
+
+    private final List<AllowableValue> strategyList = 
Collections.unmodifiableList(Arrays.asList(
+        SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, 
HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA));
+
+    protected PropertyDescriptor getSchemaAcessStrategyDescriptor() {
+        return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>(2);
+
+        final AllowableValue[] strategies = 
getSchemaAccessStrategyValues().toArray(new AllowableValue[0]);
+        properties.add(new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
+            .allowableValues(strategies)
+            .defaultValue(getDefaultSchemaAccessStrategy().getValue())
+            .build());
+
+        properties.add(SCHEMA_REGISTRY);
+        properties.add(SCHEMA_NAME);
+        properties.add(SCHEMA_VERSION);
+        properties.add(SCHEMA_BRANCH_NAME);
+        properties.add(SCHEMA_TEXT);
+
+        return properties;
+    }
+
+    protected AllowableValue getDefaultSchemaAccessStrategy() {
+        return SCHEMA_NAME_PROPERTY;
+    }
+
+    @OnEnabled
+    public void storeSchemaAccessStrategy(final ConfigurationContext context) {
+        this.configurationContext = context;
+
+        final SchemaRegistry schemaRegistry = 
context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
+
+        final PropertyDescriptor descriptor = 
getSchemaAcessStrategyDescriptor();
+        final String schemaAccess = context.getProperty(descriptor).getValue();
+        this.schemaAccessStrategy = getSchemaAccessStrategy(schemaAccess, 
schemaRegistry, context);
+    }
+
+    @Override
+    protected ConfigurationContext getConfigurationContext() {
+        return configurationContext;
+    }
+
+    protected SchemaAccessStrategy getSchemaAccessStrategy() {
+        return schemaAccessStrategy;
+    }
+
+    public final RecordSchema getSchema(final Map<String, String> variables, 
final InputStream contentStream, final RecordSchema readSchema) throws 
SchemaNotFoundException, IOException {
+        final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy();
+        if (accessStrategy == null) {
+            throw new SchemaNotFoundException("Could not determine the Schema 
Access Strategy for this service");
+        }
+
+        return getSchemaAccessStrategy().getSchema(variables, contentStream, 
readSchema);
+    }
+
+    public RecordSchema getSchema(final Map<String, String> variables, final 
RecordSchema readSchema) throws SchemaNotFoundException, IOException {
+        return getSchema(variables, EMPTY_INPUT_STREAM, readSchema);
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final String schemaAccessStrategy = 
validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
+        return 
SchemaAccessUtils.validateSchemaAccessStrategy(validationContext, 
schemaAccessStrategy, getSchemaAccessStrategyValues());
+    }
+
+    protected List<AllowableValue> getSchemaAccessStrategyValues() {
+        return strategyList;
+    }
+
+    protected Set<SchemaField> getSuppliedSchemaFields(final ValidationContext 
validationContext) {
+        final String accessStrategyValue = 
validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
+        final SchemaRegistry schemaRegistry = 
validationContext.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
+        final SchemaAccessStrategy accessStrategy = 
getSchemaAccessStrategy(accessStrategyValue, schemaRegistry, validationContext);
+
+        if (accessStrategy == null) {
+            return EnumSet.noneOf(SchemaField.class);
+        }
+        final Set<SchemaField> suppliedFields = 
accessStrategy.getSuppliedSchemaFields();
+        return suppliedFields;
+    }
+
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String 
strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext 
context) {
+        if (strategy == null) {
+            return null;
+        }
+
+        return SchemaAccessUtils.getSchemaAccessStrategy(strategy, 
schemaRegistry, context);
+    }
+
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String 
allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext 
context) {
+        if (allowableValue == null) {
+            return null;
+        }
+        return SchemaAccessUtils.getSchemaAccessStrategy(allowableValue, 
schemaRegistry, context);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/159b64b4/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
deleted file mode 100644
index b299191..0000000
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization;
-
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.schema.access.SchemaAccessStrategy;
-import org.apache.nifi.schema.access.SchemaAccessUtils;
-import org.apache.nifi.schema.access.SchemaField;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.record.RecordSchema;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
-import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
-import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
-import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
-import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
-import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
-import static 
org.apache.nifi.schema.access.SchemaAccessUtils.CONFLUENT_ENCODED_SCHEMA;
-
-public abstract class SchemaRegistryService extends AbstractControllerService {
-
-    private volatile ConfigurationContext configurationContext;
-    private volatile SchemaAccessStrategy schemaAccessStrategy;
-    private static InputStream EMPTY_INPUT_STREAM = new 
ByteArrayInputStream(new byte[0]);
-
-    private final List<AllowableValue> strategyList = 
Collections.unmodifiableList(Arrays.asList(
-        SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, 
HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA));
-
-    protected PropertyDescriptor getSchemaAcessStrategyDescriptor() {
-        return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        final List<PropertyDescriptor> properties = new ArrayList<>(2);
-
-        final AllowableValue[] strategies = 
getSchemaAccessStrategyValues().toArray(new AllowableValue[0]);
-        properties.add(new PropertyDescriptor.Builder()
-            .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
-            .allowableValues(strategies)
-            .defaultValue(getDefaultSchemaAccessStrategy().getValue())
-            .build());
-
-        properties.add(SCHEMA_REGISTRY);
-        properties.add(SCHEMA_NAME);
-        properties.add(SCHEMA_VERSION);
-        properties.add(SCHEMA_BRANCH_NAME);
-        properties.add(SCHEMA_TEXT);
-
-        return properties;
-    }
-
-    protected AllowableValue getDefaultSchemaAccessStrategy() {
-        return SCHEMA_NAME_PROPERTY;
-    }
-
-    @OnEnabled
-    public void storeSchemaAccessStrategy(final ConfigurationContext context) {
-        this.configurationContext = context;
-
-        final SchemaRegistry schemaRegistry = 
context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
-
-        final PropertyDescriptor descriptor = 
getSchemaAcessStrategyDescriptor();
-        final String schemaAccess = context.getProperty(descriptor).getValue();
-        this.schemaAccessStrategy = getSchemaAccessStrategy(schemaAccess, 
schemaRegistry, context);
-    }
-
-    @Override
-    protected ConfigurationContext getConfigurationContext() {
-        return configurationContext;
-    }
-
-    protected SchemaAccessStrategy getSchemaAccessStrategy() {
-        return schemaAccessStrategy;
-    }
-
-    public final RecordSchema getSchema(final Map<String, String> variables, 
final InputStream contentStream, final RecordSchema readSchema) throws 
SchemaNotFoundException, IOException {
-        final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy();
-        if (accessStrategy == null) {
-            throw new SchemaNotFoundException("Could not determine the Schema 
Access Strategy for this service");
-        }
-
-        return getSchemaAccessStrategy().getSchema(variables, contentStream, 
readSchema);
-    }
-
-    public RecordSchema getSchema(final Map<String, String> variables, final 
RecordSchema readSchema) throws SchemaNotFoundException, IOException {
-        return getSchema(variables, EMPTY_INPUT_STREAM, readSchema);
-    }
-
-    @Override
-    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
-        final String schemaAccessStrategy = 
validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
-        return 
SchemaAccessUtils.validateSchemaAccessStrategy(validationContext, 
schemaAccessStrategy, getSchemaAccessStrategyValues());
-    }
-
-    protected List<AllowableValue> getSchemaAccessStrategyValues() {
-        return strategyList;
-    }
-
-    protected Set<SchemaField> getSuppliedSchemaFields(final ValidationContext 
validationContext) {
-        final String accessStrategyValue = 
validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
-        final SchemaRegistry schemaRegistry = 
validationContext.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
-        final SchemaAccessStrategy accessStrategy = 
getSchemaAccessStrategy(accessStrategyValue, schemaRegistry, validationContext);
-
-        if (accessStrategy == null) {
-            return EnumSet.noneOf(SchemaField.class);
-        }
-        final Set<SchemaField> suppliedFields = 
accessStrategy.getSuppliedSchemaFields();
-        return suppliedFields;
-    }
-
-    protected SchemaAccessStrategy getSchemaAccessStrategy(final String 
strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext 
context) {
-        if (strategy == null) {
-            return null;
-        }
-
-        return SchemaAccessUtils.getSchemaAccessStrategy(strategy, 
schemaRegistry, context);
-    }
-
-    protected SchemaAccessStrategy getSchemaAccessStrategy(final String 
allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext 
context) {
-        if (allowableValue == null) {
-            return null;
-        }
-        return SchemaAccessUtils.getSchemaAccessStrategy(allowableValue, 
schemaRegistry, context);
-    }
-
-}

Reply via email to