http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java new file mode 100644 index 0000000..6676ee6 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java @@ -0,0 +1,541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.FlowFileAccessException; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.hadoop.exception.FailureException; +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException; +import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.util.StopWatch; + +import java.io.BufferedInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA; +import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY; + +/** + * Base class for processors that write Records to HDFS. + */ +@TriggerWhenEmpty // trigger when empty so we have a chance to perform a Kerberos re-login +@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield since we are triggering when empty +public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor { + + + public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder() + .name("compression-type") + .displayName("Compression Type") + .description("The type of compression for the file being written.") + .required(true) + .build(); + + public static final PropertyDescriptor OVERWRITE = new PropertyDescriptor.Builder() + .name("overwrite") + .displayName("Overwrite Files") + .description("Whether or not to overwrite existing files in the same directory with the same name. When set to false, " + + "flow files will be routed to failure when a file exists in the same directory with the same name.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder() + .name("permissions-umask") + .displayName("Permissions umask") + .description("A umask represented as an octal number which determines the permissions of files written to HDFS. " + + "This overrides the Hadoop Configuration dfs.umaskmode") + .addValidator(HadoopValidators.UMASK_VALIDATOR) + .build(); + + public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder() + .name("remote-owner") + .displayName("Remote Owner") + .description("Changes the owner of the HDFS file to this value after it is written. " + + "This only works if NiFi is running as a user that has HDFS super user privilege to change owner") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder() + .name("remote-group") + .displayName("Remote Group") + .description("Changes the group of the HDFS file to this value after it is written. " + + "This only works if NiFi is running as a user that has HDFS super user privilege to change group") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("The service for reading records from incoming flow files.") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Flow Files that have been successfully processed are transferred to this relationship") + .build(); + + public static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description("Flow Files that could not be processed due to issues that can be retried are transferred to this relationship") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Flow Files that could not be processed due to issue that cannot be retried are transferred to this relationship") + .build(); + + public static final String RECORD_COUNT_ATTR = "record.count"; + + private volatile String remoteOwner; + private volatile String remoteGroup; + private volatile SchemaAccessStrategy schemaAccessStrategy; + + private volatile Set<Relationship> putHdfsRecordRelationships; + private volatile List<PropertyDescriptor> putHdfsRecordProperties; + + private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList( + SCHEMA_NAME_PROPERTY, + SCHEMA_TEXT_PROPERTY, + HWX_SCHEMA_REF_ATTRIBUTES, + HWX_CONTENT_ENCODED_SCHEMA + )); + + + @Override + protected final void init(final ProcessorInitializationContext context) { + super.init(context); + + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_RETRY); + rels.add(REL_FAILURE); + this.putHdfsRecordRelationships = Collections.unmodifiableSet(rels); + + final List<PropertyDescriptor> props = new ArrayList<>(properties); + props.add(RECORD_READER); + + props.add(new PropertyDescriptor.Builder() + .fromPropertyDescriptor(DIRECTORY) + .description("The parent directory to which files should be written. Will be created if it doesn't exist.") + .build()); + + final AllowableValue[] strategies = getSchemaAccessStrategyValues().toArray(new AllowableValue[0]); + + props.add(new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY) + .description("Specifies how to obtain the schema that is to be used for writing the data.") + .allowableValues(strategies) + .defaultValue(getDefaultSchemaAccessStrategy().getValue()) + .build()); + + props.add(SCHEMA_REGISTRY); + props.add(SCHEMA_NAME); + props.add(SCHEMA_TEXT); + + final AllowableValue[] compressionTypes = getCompressionTypes(context).toArray(new AllowableValue[0]); + + props.add(new PropertyDescriptor.Builder() + .fromPropertyDescriptor(COMPRESSION_TYPE) + .allowableValues(compressionTypes) + .defaultValue(getDefaultCompressionType(context)) + .build()); + + props.add(OVERWRITE); + props.add(UMASK); + props.add(REMOTE_GROUP); + props.add(REMOTE_OWNER); + props.addAll(getAdditionalProperties()); + this.putHdfsRecordProperties = Collections.unmodifiableList(props); + } + + protected List<AllowableValue> getSchemaAccessStrategyValues() { + return strategyList; + } + + protected AllowableValue getDefaultSchemaAccessStrategy() { + return SCHEMA_NAME_PROPERTY; + } + + private PropertyDescriptor getSchemaAcessStrategyDescriptor() { + return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName()); + } + + /** + * @param context the initialization context + * @return the possible compression types + */ + public abstract List<AllowableValue> getCompressionTypes(final ProcessorInitializationContext context); + + /** + * @param context the initialization context + * @return the default compression type + */ + public abstract String getDefaultCompressionType(final ProcessorInitializationContext context); + + /** + * Allows sub-classes to add additional properties, called from initialize. + * + * @return additional properties to add to the overall list + */ + public List<PropertyDescriptor> getAdditionalProperties() { + return Collections.emptyList(); + } + + @Override + public final Set<Relationship> getRelationships() { + return putHdfsRecordRelationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return putHdfsRecordProperties; + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final String schemaAccessStrategy = validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue(); + return SchemaAccessUtils.validateSchemaAccessStrategy(validationContext, schemaAccessStrategy, getSchemaAccessStrategyValues()); + } + + @OnScheduled + public final void onScheduled(final ProcessContext context) throws IOException { + super.abstractOnScheduled(context); + + final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class); + + final PropertyDescriptor descriptor = getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName()); + final String schemaAccess = context.getProperty(descriptor).getValue(); + this.schemaAccessStrategy = SchemaAccessUtils.getSchemaAccessStrategy(schemaAccess, schemaRegistry, context); + + this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue(); + this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue(); + + // Set umask once, to avoid thread safety issues doing it in onTrigger + final PropertyValue umaskProp = context.getProperty(UMASK); + final short dfsUmask; + if (umaskProp.isSet()) { + dfsUmask = Short.parseShort(umaskProp.getValue(), 8); + } else { + dfsUmask = FsPermission.DEFAULT_UMASK; + } + final Configuration conf = getConfiguration(); + FsPermission.setUMask(conf, new FsPermission(dfsUmask)); + } + + /** + * Sub-classes provide the appropriate HDFSRecordWriter. + * + * @param context the process context to obtain additional configuration + * @param flowFile the flow file being written + * @param conf the Configuration instance + * @param path the path to write to + * @param schema the schema for writing + * @return the HDFSRecordWriter + * @throws IOException if an error occurs creating the writer or processing the schema + */ + public abstract HDFSRecordWriter createHDFSRecordWriter( + final ProcessContext context, + final FlowFile flowFile, + final Configuration conf, + final Path path, + final RecordSchema schema) throws IOException, SchemaNotFoundException; + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + // do this before getting a flow file so that we always get a chance to attempt Kerberos relogin + final FileSystem fileSystem = getFileSystem(); + final Configuration configuration = getConfiguration(); + final UserGroupInformation ugi = getUserGroupInformation(); + + if (configuration == null || fileSystem == null || ugi == null) { + getLogger().error("Processor not configured properly because Configuration, FileSystem, or UserGroupInformation was null"); + context.yield(); + return; + } + + final FlowFile flowFile = session.get(); + if (flowFile == null) { + context.yield(); + return; + } + + ugi.doAs((PrivilegedAction<Object>)() -> { + Path tempDotCopyFile = null; + FlowFile putFlowFile = flowFile; + try { + final String filenameValue = putFlowFile.getAttribute(CoreAttributes.FILENAME.key()); // TODO codec extension + final String directoryValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue(); + + // create the directory if it doesn't exist + final Path directoryPath = new Path(directoryValue); + createDirectory(fileSystem, directoryPath, remoteOwner, remoteGroup); + + // write to tempFile first and on success rename to destFile + final Path tempFile = new Path(directoryPath, "." + filenameValue); + final Path destFile = new Path(directoryPath, filenameValue); + + final boolean destinationExists = fileSystem.exists(destFile) || fileSystem.exists(tempFile); + final boolean shouldOverwrite = context.getProperty(OVERWRITE).asBoolean(); + + // if the tempFile or destFile already exist, and overwrite is set to false, then transfer to failure + if (destinationExists && !shouldOverwrite) { + session.transfer(session.penalize(putFlowFile), REL_FAILURE); + getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{putFlowFile}); + return null; + } + + final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>(null); + final AtomicReference<WriteResult> writeResult = new AtomicReference<>(); + final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + + final FlowFile flowFileIn = putFlowFile; + final StopWatch stopWatch = new StopWatch(true); + + // Read records from the incoming FlowFile and write them the tempFile + session.read(putFlowFile, (final InputStream rawIn) -> { + RecordReader recordReader = null; + HDFSRecordWriter recordWriter = null; + + try (final BufferedInputStream in = new BufferedInputStream(rawIn)) { + final RecordSchema destRecordSchema = schemaAccessStrategy.getSchema(flowFile, in); + recordWriter = createHDFSRecordWriter(context, flowFile, configuration, tempFile, destRecordSchema); + + // if we fail to create the RecordReader then we want to route to failure, so we need to + // handle this separately from the other IOExceptions which normally route to retry + try { + recordReader = recordReaderFactory.createRecordReader(flowFileIn, in, getLogger()); + } catch (Exception e) { + final RecordReaderFactoryException rrfe = new RecordReaderFactoryException("Unable to create RecordReader", e); + exceptionHolder.set(rrfe); + return; + } + + final RecordSet recordSet = recordReader.createRecordSet(); + writeResult.set(recordWriter.write(recordSet)); + + } catch (Exception e) { + exceptionHolder.set(e); + } finally { + IOUtils.closeQuietly(recordReader); + IOUtils.closeQuietly(recordWriter); + } + }); + stopWatch.stop(); + + final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize()); + final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + tempDotCopyFile = tempFile; + + // if any errors happened within the session.read then throw the exception so we jump + // into one of the appropriate catch blocks below + if (exceptionHolder.get() != null) { + throw exceptionHolder.get(); + } + + // Attempt to rename from the tempFile to destFile, and change owner if successfully renamed + rename(fileSystem, tempFile, destFile); + changeOwner(fileSystem, destFile, remoteOwner, remoteGroup); + + getLogger().info("Wrote {} to {} in {} milliseconds at a rate of {}", new Object[]{putFlowFile, destFile, millis, dataRate}); + + putFlowFile = postProcess(context, session, putFlowFile, destFile); + + final String outputPath = destFile.toString(); + final String newFilename = destFile.getName(); + final String hdfsPath = destFile.getParent().toString(); + + // Update the filename and absolute path attributes + final Map<String,String> attributes = new HashMap<>(writeResult.get().getAttributes()); + attributes.put(CoreAttributes.FILENAME.key(), newFilename); + attributes.put(ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath); + attributes.put(RECORD_COUNT_ATTR, String.valueOf(writeResult.get().getRecordCount())); + putFlowFile = session.putAllAttributes(putFlowFile, attributes); + + // Send a provenance event and transfer to success + final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath; + session.getProvenanceReporter().send(putFlowFile, transitUri); + session.transfer(putFlowFile, REL_SUCCESS); + + } catch (IOException | FlowFileAccessException e) { + deleteQuietly(fileSystem, tempDotCopyFile); + getLogger().error("Failed to write due to {}", new Object[]{e}); + session.transfer(session.penalize(putFlowFile), REL_RETRY); + context.yield(); + } catch (Throwable t) { + deleteQuietly(fileSystem, tempDotCopyFile); + getLogger().error("Failed to write due to {}", new Object[]{t}); + session.transfer(putFlowFile, REL_FAILURE); + } + + return null; + }); + } + + /** + * This method will be called after successfully writing to the destination file and renaming the file to it's final name + * in order to give sub-classes a chance to take action before transferring to success. + * + * @param context the context + * @param session the session + * @param flowFile the flow file being processed + * @param destFile the destination file written to + * @return an updated FlowFile reference + */ + protected FlowFile postProcess(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path destFile) { + return flowFile; + } + + /** + * Attempts to rename srcFile to destFile up to 10 times, with a 200ms sleep in between each attempt. + * + * If the file has not been renamed after 10 attempts, a FailureException is thrown. + * + * @param fileSystem the file system where the files are located + * @param srcFile the source file + * @param destFile the destination file to rename the source to + * @throws IOException if IOException happens while attempting to rename + * @throws InterruptedException if renaming is interrupted + * @throws FailureException if the file couldn't be renamed after 10 attempts + */ + protected void rename(final FileSystem fileSystem, final Path srcFile, final Path destFile) throws IOException, InterruptedException, FailureException { + boolean renamed = false; + for (int i = 0; i < 10; i++) { // try to rename multiple times. + if (fileSystem.rename(srcFile, destFile)) { + renamed = true; + break;// rename was successful + } + Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve + } + if (!renamed) { + fileSystem.delete(srcFile, false); + throw new FailureException("Could not rename file " + srcFile + " to its final filename"); + } + } + + /** + * Deletes the given file from the given filesystem. Any exceptions that are encountered will be caught and logged, but not thrown. + * + * @param fileSystem the filesystem where the file exists + * @param file the file to delete + */ + protected void deleteQuietly(final FileSystem fileSystem, final Path file) { + if (file != null) { + try { + fileSystem.delete(file, false); + } catch (Exception e) { + getLogger().error("Unable to remove file {} due to {}", new Object[]{file, e}); + } + } + } + + /** + * Changes the ownership of the given file. + * + * @param fileSystem the filesystem where the file exists + * @param path the file to change ownership on + * @param remoteOwner the new owner for the file + * @param remoteGroup the new group for the file + */ + protected void changeOwner(final FileSystem fileSystem, final Path path, final String remoteOwner, final String remoteGroup) { + try { + // Change owner and group of file if configured to do so + if (remoteOwner != null || remoteGroup != null) { + fileSystem.setOwner(path, remoteOwner, remoteGroup); + } + } catch (Exception e) { + getLogger().warn("Could not change owner or group of {} on due to {}", new Object[]{path, e}); + } + } + + /** + * Creates the given directory and changes the ownership to the specified owner/group. + * + * @param fileSystem the filesystem to create the directory on + * @param directory the directory to create + * @param remoteOwner the owner for changing ownership of the directory + * @param remoteGroup the group for changing ownership of the directory + * @throws IOException if an error occurs obtaining the file status or issuing the mkdir command + * @throws FailureException if the directory could not be created + */ + protected void createDirectory(final FileSystem fileSystem, final Path directory, final String remoteOwner, final String remoteGroup) throws IOException, FailureException { + try { + if (!fileSystem.getFileStatus(directory).isDirectory()) { + throw new FailureException(directory.toString() + " already exists and is not a directory"); + } + } catch (FileNotFoundException fe) { + if (!fileSystem.mkdirs(directory)) { + throw new FailureException(directory.toString() + " could not be created"); + } + changeOwner(fileSystem, directory, remoteOwner, remoteGroup); + } + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/FailureException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/FailureException.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/FailureException.java new file mode 100644 index 0000000..02fa295 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/FailureException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop.exception; + +/** + * An exception to represent an error that occurred during a put to HDFS and should not be retried. + */ +public class FailureException extends Exception { + + public FailureException(String message) { + super(message); + } + + public FailureException(String message, Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/InvalidSchemaException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/InvalidSchemaException.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/InvalidSchemaException.java new file mode 100644 index 0000000..e0f967f --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/InvalidSchemaException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop.exception; + +/** + * Thrown when a schema is unable to be parsed into the expected type. + */ +public class InvalidSchemaException extends FailureException { + + public InvalidSchemaException(String message) { + super(message); + } + + public InvalidSchemaException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/RecordReaderFactoryException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/RecordReaderFactoryException.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/RecordReaderFactoryException.java new file mode 100644 index 0000000..83a5b41 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/RecordReaderFactoryException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop.exception; + +/** + * Thrown when an error is occurs while using the record reader factory to create a record reader. + */ +public class RecordReaderFactoryException extends FailureException { + + public RecordReaderFactoryException(String message) { + super(message); + } + + public RecordReaderFactoryException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/record/HDFSRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/record/HDFSRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/record/HDFSRecordReader.java new file mode 100644 index 0000000..f83d7ff --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/record/HDFSRecordReader.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop.record; + +import org.apache.nifi.serialization.record.Record; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Reads Records from HDFS. + */ +public interface HDFSRecordReader extends Closeable { + + Record nextRecord() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/record/HDFSRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/record/HDFSRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/record/HDFSRecordWriter.java new file mode 100644 index 0000000..e21665f --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/record/HDFSRecordWriter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop.record; + +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSet; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; + +/** + * Writes Records to HDFS. + */ +public interface HDFSRecordWriter extends Closeable { + + /** + * @param record the record to write + * @throws IOException if an I/O error happens writing the record + */ + void write(Record record) throws IOException; + + /** + * @param recordSet the RecordSet to write + * @return the result of writing the record set + * @throws IOException if an I/O error happens reading from the RecordSet, or writing a Record + */ + default WriteResult write(final RecordSet recordSet) throws IOException { + int recordCount = 0; + + Record record; + while ((record = recordSet.next()) != null) { + write(record); + recordCount++; + } + + return WriteResult.of(recordCount, Collections.emptyMap()); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/pom.xml new file mode 100644 index 0000000..3a2ea85 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/pom.xml @@ -0,0 +1,45 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-utils</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-mock-record-utils</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <!-- Other modules using nifi-standard-record-utils are expected to have this API available, typically through a NAR dependency --> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java new file mode 100644 index 0000000..e3ed23e --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory { + private final List<Object[]> records = new ArrayList<>(); + private final List<RecordField> fields = new ArrayList<>(); + private final int failAfterN; + + public MockRecordParser() { + this(-1); + } + + public MockRecordParser(final int failAfterN) { + this.failAfterN = failAfterN; + } + + + public void addSchemaField(final String fieldName, final RecordFieldType type) { + fields.add(new RecordField(fieldName, type.getDataType())); + } + + public void addRecord(Object... values) { + records.add(values); + } + + @Override + public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException { + final Iterator<Object[]> itr = records.iterator(); + + return new RecordReader() { + private int recordCount = 0; + + @Override + public void close() throws IOException { + } + + @Override + public Record nextRecord() throws IOException, MalformedRecordException { + if (failAfterN >= recordCount) { + throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read"); + } + recordCount++; + + if (!itr.hasNext()) { + return null; + } + + final Object[] values = itr.next(); + final Map<String, Object> valueMap = new HashMap<>(); + int i = 0; + for (final RecordField field : fields) { + final String fieldName = field.getFieldName(); + valueMap.put(fieldName, values[i++]); + } + + return new MapRecord(new SimpleRecordSchema(fields), valueMap); + } + + @Override + public RecordSchema getSchema() { + return new SimpleRecordSchema(fields); + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java new file mode 100644 index 0000000..99c72e4 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; + +public class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory { + private final String header; + private final int failAfterN; + private final boolean quoteValues; + + public MockRecordWriter(final String header) { + this(header, true, -1); + } + + public MockRecordWriter(final String header, final boolean quoteValues) { + this(header, quoteValues, -1); + } + + public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN) { + this.header = header; + this.quoteValues = quoteValues; + this.failAfterN = failAfterN; + } + + @Override + public RecordSetWriter createWriter(final ComponentLog logger, final FlowFile flowFile, final InputStream in) { + return new RecordSetWriter() { + @Override + public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException { + out.write(header.getBytes()); + out.write("\n".getBytes()); + + int recordCount = 0; + Record record = null; + while ((record = rs.next()) != null) { + if (++recordCount > failAfterN && failAfterN > -1) { + throw new IOException("Unit Test intentionally throwing IOException after " + failAfterN + " records were written"); + } + + final int numCols = record.getSchema().getFieldCount(); + + int i = 0; + for (final String fieldName : record.getSchema().getFieldNames()) { + final String val = record.getAsString(fieldName); + if (quoteValues) { + out.write("\"".getBytes()); + if (val != null) { + out.write(val.getBytes()); + } + out.write("\"".getBytes()); + } else if (val != null) { + out.write(val.getBytes()); + } + + if (i++ < numCols - 1) { + out.write(",".getBytes()); + } + } + out.write("\n".getBytes()); + } + + return WriteResult.of(recordCount, Collections.emptyMap()); + } + + @Override + public String getMimeType() { + return "text/plain"; + } + + @Override + public WriteResult write(Record record, OutputStream out) throws IOException { + out.write(header.getBytes()); + out.write("\n".getBytes()); + + final int numCols = record.getSchema().getFieldCount(); + int i = 0; + for (final String fieldName : record.getSchema().getFieldNames()) { + final String val = record.getAsString(fieldName); + if (quoteValues) { + out.write("\"".getBytes()); + out.write(val.getBytes()); + out.write("\"".getBytes()); + } else { + out.write(val.getBytes()); + } + + if (i++ < numCols - 1) { + out.write(",".getBytes()); + } + } + out.write("\n".getBytes()); + + return WriteResult.of(1, Collections.emptyMap()); + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/pom.xml new file mode 100644 index 0000000..3aa27a7 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/pom.xml @@ -0,0 +1,45 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-utils</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-standard-record-utils</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <!-- Other modules using nifi-standard-record-utils are expected to have these APIs available, typically through a NAR dependency --> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-schema-registry-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java new file mode 100644 index 0000000..d1f0e8a --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.access; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccessStrategy { + private final Set<SchemaField> schemaFields; + + public static final String SCHEMA_ID_ATTRIBUTE = "schema.identifier"; + public static final String SCHEMA_VERSION_ATTRIBUTE = "schema.version"; + public static final String SCHEMA_PROTOCOL_VERSION_ATTRIBUTE = "schema.protocol.version"; + + private final SchemaRegistry schemaRegistry; + + + public HortonworksAttributeSchemaReferenceStrategy(final SchemaRegistry schemaRegistry) { + this.schemaRegistry = schemaRegistry; + + schemaFields = new HashSet<>(); + schemaFields.add(SchemaField.SCHEMA_IDENTIFIER); + schemaFields.add(SchemaField.SCHEMA_VERSION); + schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields()); + } + + @Override + public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException { + final String schemaIdentifier = flowFile.getAttribute(SCHEMA_ID_ATTRIBUTE); + final String schemaVersion = flowFile.getAttribute(SCHEMA_VERSION_ATTRIBUTE); + final String schemaProtocol = flowFile.getAttribute(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE); + if (schemaIdentifier == null || schemaVersion == null || schemaProtocol == null) { + throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because it is missing one of the following three required attributes: " + + SCHEMA_ID_ATTRIBUTE + ", " + SCHEMA_VERSION_ATTRIBUTE + ", " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE); + } + + if (!isNumber(schemaProtocol)) { + throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '" + + schemaProtocol + "', which is not a valid Protocol Version number"); + } + + final int protocol = Integer.parseInt(schemaProtocol); + if (protocol != 1) { + throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '" + + schemaProtocol + "', which is not a valid Protocol Version number. Expected Protocol Version to be 1."); + } + + if (!isNumber(schemaIdentifier)) { + throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '" + + schemaProtocol + "', which is not a valid Schema Identifier number"); + } + + if (!isNumber(schemaVersion)) { + throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '" + + schemaProtocol + "', which is not a valid Schema Version number"); + } + + final long schemaId = Long.parseLong(schemaIdentifier); + final int version = Integer.parseInt(schemaVersion); + + final RecordSchema schema = schemaRegistry.retrieveSchema(schemaId, version); + if (schema == null) { + throw new SchemaNotFoundException("Could not find a Schema in the Schema Registry with Schema Identifier '" + schemaId + "' and Version '" + version + "'"); + } + + return schema; + } + + private static boolean isNumber(final String value) { + if (value == null) { + return false; + } + + final String trimmed = value.trim(); + if (value.isEmpty()) { + return false; + } + + for (int i = 0; i < trimmed.length(); i++) { + final char c = value.charAt(i); + if (c > '9' || c < '0') { + return false; + } + } + + return true; + } + + @Override + public Set<SchemaField> getSuppliedSchemaFields() { + return schemaFields; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java new file mode 100644 index 0000000..dd7d676 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.access; + +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWriter { + private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); + private static final int LATEST_PROTOCOL_VERSION = 1; + + @Override + public void writeHeader(RecordSchema schema, OutputStream out) throws IOException { + } + + @Override + public Map<String, String> getAttributes(final RecordSchema schema) { + final Map<String, String> attributes = new HashMap<>(4); + final SchemaIdentifier id = schema.getIdentifier(); + + final long schemaId = id.getIdentifier().getAsLong(); + final int schemaVersion = id.getVersion().getAsInt(); + + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId)); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion)); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(LATEST_PROTOCOL_VERSION)); + + return attributes; + } + + @Override + public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException { + final SchemaIdentifier id = schema.getIdentifier(); + if (!id.getIdentifier().isPresent()) { + throw new SchemaNotFoundException("Cannot write Schema Reference as Attributes because it does not contain a Schema Identifier"); + } + if (!id.getVersion().isPresent()) { + throw new SchemaNotFoundException("Cannot write Schema Reference as Attributes because it does not contain a Schema Version"); + } + } + + @Override + public Set<SchemaField> getRequiredSchemaFields() { + return requiredSchemaFields; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java new file mode 100644 index 0000000..a00e322 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.access; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.stream.io.StreamUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessStrategy { + private static final int LATEST_PROTOCOL_VERSION = 1; + + private final Set<SchemaField> schemaFields; + private final SchemaRegistry schemaRegistry; + + public HortonworksEncodedSchemaReferenceStrategy(final SchemaRegistry schemaRegistry) { + this.schemaRegistry = schemaRegistry; + + schemaFields = new HashSet<>(); + schemaFields.add(SchemaField.SCHEMA_IDENTIFIER); + schemaFields.add(SchemaField.SCHEMA_VERSION); + schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields()); + } + + @Override + public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException { + final byte[] buffer = new byte[13]; + try { + StreamUtils.fillBuffer(contentStream, buffer); + } catch (final IOException ioe) { + throw new SchemaNotFoundException("Could not read first 13 bytes from stream", ioe); + } + + // This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer + // as it is provided at: + // https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java + final ByteBuffer bb = ByteBuffer.wrap(buffer); + final int protocolVersion = bb.get(); + if (protocolVersion != 1) { + throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version " + + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion); + } + + final long schemaId = bb.getLong(); + final int schemaVersion = bb.getInt(); + + return schemaRegistry.retrieveSchema(schemaId, schemaVersion); + } + + @Override + public Set<SchemaField> getSuppliedSchemaFields() { + return schemaFields; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java new file mode 100644 index 0000000..bf6a9ea --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.access; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.EnumSet; +import java.util.Map; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.Set; + +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; + +public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWriter { + private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); + private static final int LATEST_PROTOCOL_VERSION = 1; + + @Override + public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException { + final SchemaIdentifier identifier = schema.getIdentifier(); + final long id = identifier.getIdentifier().getAsLong(); + final int version = identifier.getVersion().getAsInt(); + + // This decoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer + // as it is provided at: + // https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java + final ByteBuffer bb = ByteBuffer.allocate(13); + bb.put((byte) LATEST_PROTOCOL_VERSION); + bb.putLong(id); + bb.putInt(version); + + out.write(bb.array()); + } + + @Override + public Map<String, String> getAttributes(final RecordSchema schema) { + return Collections.emptyMap(); + } + + @Override + public void validateSchema(RecordSchema schema) throws SchemaNotFoundException { + final SchemaIdentifier identifier = schema.getIdentifier(); + final OptionalLong identifierOption = identifier.getIdentifier(); + if (!identifierOption.isPresent()) { + throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known"); + } + + final OptionalInt versionOption = identifier.getVersion(); + if (!versionOption.isPresent()) { + throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known"); + } + } + + @Override + public Set<SchemaField> getRequiredSchemaFields() { + return requiredSchemaFields; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java new file mode 100644 index 0000000..68f9ecf --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.access; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Set; + +public interface SchemaAccessStrategy { + /** + * Returns the schema for the given FlowFile using the supplied stream of content and configuration + * + * @param flowFile flowfile + * @param contentStream content of flowfile + * @return the RecordSchema for the FlowFile + */ + RecordSchema getSchema(FlowFile flowFile, InputStream contentStream) throws SchemaNotFoundException, IOException; + + /** + * @return the set of all Schema Fields that are supplied by the RecordSchema that is returned from {@link #getSchema(FlowFile, InputStream)}. + */ + Set<SchemaField> getSuppliedSchemaFields(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java new file mode 100644 index 0000000..30a995c --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.access; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.serialization.record.RecordSchema; + +public interface SchemaAccessWriter { + + /** + * Writes the given Record Schema to the given OutputStream as header information, if appropriate, + * or returns without writing anything if the implementation does not need to write information to + * the contents of the FlowFile + * + * @param schema the schema to write + * @param out the OutputStream to write to + * @throws IOException if unable to write to the given stream + */ + void writeHeader(RecordSchema schema, OutputStream out) throws IOException; + + /** + * Returns a Map of String to String that represent the attributes that should be added to the FlowFile, or + * an empty map if no attributes should be added. + * + * @return a Map of attributes to add to the FlowFile. + */ + Map<String, String> getAttributes(RecordSchema schema); + + /** + * Ensures that the given schema can be written by this SchemaAccessWriter or throws SchemaNotFoundException if + * the schema does not contain sufficient information to be written + * + * @param schema the schema to validate + * @throws SchemaNotFoundException if the schema does not contain sufficient information to be written + */ + void validateSchema(RecordSchema schema) throws SchemaNotFoundException; + + /** + * Specifies the set of SchemaField's that are required in order to use this Schema Access Writer + * + * @return the set of SchemaField's that are required in order to use this Schema Access Writer + */ + Set<SchemaField> getRequiredSchemaFields(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java new file mode 100644 index 0000000..54a248d --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.access; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collections; +import java.util.EnumSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; + +public class SchemaNameAsAttribute implements SchemaAccessWriter { + private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME); + private static final String SCHEMA_NAME_ATTRIBUTE = "schema.name"; + + @Override + public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException { + } + + @Override + public Map<String, String> getAttributes(final RecordSchema schema) { + final SchemaIdentifier identifier = schema.getIdentifier(); + final Optional<String> nameOption = identifier.getName(); + if (nameOption.isPresent()) { + return Collections.singletonMap(SCHEMA_NAME_ATTRIBUTE, nameOption.get()); + } + return Collections.emptyMap(); + } + + @Override + public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException { + final SchemaIdentifier schemaId = schema.getIdentifier(); + if (!schemaId.getName().isPresent()) { + throw new SchemaNotFoundException("Cannot write Schema Name As Attribute because the Schema Name is not known"); + } + } + + @Override + public Set<SchemaField> getRequiredSchemaFields() { + return schemaFields; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java new file mode 100644 index 0000000..d59e5da --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.access; + +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.InputStream; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public class SchemaNamePropertyStrategy implements SchemaAccessStrategy { + private final Set<SchemaField> schemaFields; + + private final SchemaRegistry schemaRegistry; + private final PropertyValue schemaNamePropertyValue; + + public SchemaNamePropertyStrategy(final SchemaRegistry schemaRegistry, final PropertyValue schemaNamePropertyValue) { + this.schemaRegistry = schemaRegistry; + this.schemaNamePropertyValue = schemaNamePropertyValue; + + schemaFields = new HashSet<>(); + schemaFields.add(SchemaField.SCHEMA_NAME); + schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields()); + } + + @Override + public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException { + final String schemaName = schemaNamePropertyValue.evaluateAttributeExpressions(flowFile).getValue(); + if (schemaName.trim().isEmpty()) { + throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Name."); + } + + try { + final RecordSchema recordSchema = schemaRegistry.retrieveSchema(schemaName); + if (recordSchema == null) { + throw new SchemaNotFoundException("Could not find a schema with name '" + schemaName + "' in the configured Schema Registry"); + } + + return recordSchema; + } catch (final Exception e) { + throw new SchemaNotFoundException("Could not retrieve schema with name '" + schemaName + "' from the configured Schema Registry", e); + } + } + + @Override + public Set<SchemaField> getSuppliedSchemaFields() { + return schemaFields; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java new file mode 100644 index 0000000..f39bdca --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.access; + +import java.io.OutputStream; +import java.util.Collections; +import java.util.EnumSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import org.apache.nifi.serialization.record.RecordSchema; + +public class SchemaTextAsAttribute implements SchemaAccessWriter { + private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT); + + @Override + public void writeHeader(final RecordSchema schema, final OutputStream out) { + } + + @Override + public Map<String, String> getAttributes(final RecordSchema schema) { + final Optional<String> textFormatOption = schema.getSchemaFormat(); + final Optional<String> textOption = schema.getSchemaText(); + return Collections.singletonMap(textFormatOption.get() + ".schema", textOption.get()); + } + + @Override + public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException { + final Optional<String> textFormatOption = schema.getSchemaFormat(); + if (!textFormatOption.isPresent()) { + throw new SchemaNotFoundException("Cannot write Schema Text as Attribute because the Schema's Text Format is not present"); + } + + final Optional<String> textOption = schema.getSchemaText(); + if (!textOption.isPresent()) { + throw new SchemaNotFoundException("Cannot write Schema Text as Attribute because the Schema's Text is not present"); + } + } + + @Override + public Set<SchemaField> getRequiredSchemaFields() { + return schemaFields; + } +}
