http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java deleted file mode 100644 index 65b11ff..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java +++ /dev/null @@ -1,575 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.put; - -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.AbstractSessionFactoryProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processor.util.put.sender.ChannelSender; -import org.apache.nifi.processor.util.put.sender.DatagramChannelSender; -import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender; -import org.apache.nifi.processor.util.put.sender.SocketChannelSender; -import org.apache.nifi.ssl.SSLContextService; - -import javax.net.ssl.SSLContext; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * A base class for processors that send data to an external system using TCP or UDP. - */ -public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryProcessor { - - public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() - .name("Hostname") - .description("The ip address or hostname of the destination.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .defaultValue("localhost") - .required(true) - .expressionLanguageSupported(true) - .build(); - public static final PropertyDescriptor PORT = new PropertyDescriptor - .Builder().name("Port") - .description("The port on the destination.") - .required(true) - .addValidator(StandardValidators.PORT_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder() - .name("Max Size of Socket Send Buffer") - .description("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System " + - "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " + - "the data can be read, and incoming data will be dropped.") - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .defaultValue("1 MB") - .required(true) - .build(); - public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor - .Builder().name("Idle Connection Expiration") - .description("The amount of time a connection should be held open without being used before closing the connection.") - .required(true) - .defaultValue("5 seconds") - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); - - // Putting these properties here so sub-classes don't have to redefine them, but they are - // not added to the properties by default since not all processors may need them - - public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP"); - public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP"); - - public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor - .Builder().name("Protocol") - .description("The protocol for communication.") - .required(true) - .allowableValues(TCP_VALUE, UDP_VALUE) - .defaultValue(TCP_VALUE.getValue()) - .build(); - public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() - .name("Message Delimiter") - .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " - + "If not specified, the entire content of the FlowFile will be used as a single message. " - + "If specified, the contents of the FlowFile will be split on this delimiter and each section " - + "sent as a separate message. Note that if messages are delimited and some messages for a given FlowFile " - + "are transferred successfully while others are not, the messages will be split into individual FlowFiles, such that those " - + "messages that were successfully sent are routed to the 'success' relationship while other messages are sent to the 'failure' " - + "relationship.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() - .name("Character Set") - .description("Specifies the character set of the data being sent.") - .required(true) - .defaultValue("UTF-8") - .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) - .build(); - public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() - .name("Timeout") - .description("The timeout for connecting to and communicating with the destination. Does not apply to UDP") - .required(false) - .defaultValue("10 seconds") - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); - public static final PropertyDescriptor OUTGOING_MESSAGE_DELIMITER = new PropertyDescriptor.Builder() - .name("Outgoing Message Delimiter") - .description("Specifies the delimiter to use when sending messages out over the same TCP stream. The delimiter is appended to each FlowFile message " - + "that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. Users should " - + "ensure that the FlowFile content does not contain the delimiter character to avoid errors. In order to use a new line character you can " - + "enter '\\n'. For a tab character use '\\t'. Finally for a carriage return use '\\r'.") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .defaultValue("\\n") - .expressionLanguageSupported(true) - .build(); - public static final PropertyDescriptor CONNECTION_PER_FLOWFILE = new PropertyDescriptor.Builder() - .name("Connection Per FlowFile") - .description("Specifies whether to send each FlowFile's content on an individual connection.") - .required(true) - .defaultValue("false") - .allowableValues("true", "false") - .build(); - - public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() - .name("SSL Context Service") - .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " + - "messages will be sent over a secure connection.") - .required(false) - .identifiesControllerService(SSLContextService.class) - .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("FlowFiles that are sent successfully to the destination are sent out this relationship.") - .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("FlowFiles that failed to send to the destination are sent out this relationship.") - .build(); - - private Set<Relationship> relationships; - private List<PropertyDescriptor> descriptors; - - protected volatile String transitUri; - protected volatile BlockingQueue<ChannelSender> senderPool; - - protected final BlockingQueue<FlowFileMessageBatch> completeBatches = new LinkedBlockingQueue<>(); - protected final Set<FlowFileMessageBatch> activeBatches = Collections.synchronizedSet(new HashSet<FlowFileMessageBatch>()); - - @Override - protected void init(final ProcessorInitializationContext context) { - final List<PropertyDescriptor> descriptors = new ArrayList<>(); - descriptors.add(HOSTNAME); - descriptors.add(PORT); - descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE); - descriptors.add(IDLE_EXPIRATION); - descriptors.addAll(getAdditionalProperties()); - this.descriptors = Collections.unmodifiableList(descriptors); - - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - relationships.addAll(getAdditionalRelationships()); - this.relationships = Collections.unmodifiableSet(relationships); - } - - /** - * Override to provide additional relationships for the processor. - * - * @return a list of relationships - */ - protected List<Relationship> getAdditionalRelationships() { - return Collections.EMPTY_LIST; - } - - /** - * Override to provide additional properties for the processor. - * - * @return a list of properties - */ - protected List<PropertyDescriptor> getAdditionalProperties() { - return Collections.EMPTY_LIST; - } - - @Override - public final Set<Relationship> getRelationships() { - return this.relationships; - } - - @Override - public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return descriptors; - } - - @OnScheduled - public void onScheduled(final ProcessContext context) throws IOException { - // initialize the queue of senders, one per task, senders will get created on the fly in onTrigger - this.senderPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks()); - this.transitUri = createTransitUri(context); - } - - @OnStopped - public void closeSenders() { - if (senderPool != null) { - ChannelSender sender = senderPool.poll(); - while (sender != null) { - sender.close(); - sender = senderPool.poll(); - } - } - } - - /** - * Sub-classes construct a transit uri for provenance events. Called from @OnScheduled - * method of this class. - * - * @param context the current context - * - * @return the transit uri - */ - protected abstract String createTransitUri(final ProcessContext context); - - /** - * Sub-classes create a ChannelSender given a context. - * - * @param context the current context - * @return an implementation of ChannelSender - * @throws IOException if an error occurs creating the ChannelSender - */ - protected abstract ChannelSender createSender(final ProcessContext context) throws IOException; - - /** - * Close any senders that haven't been active with in the given threshold - * - * @param idleThreshold the threshold to consider a sender as idle - */ - protected void pruneIdleSenders(final long idleThreshold) { - long currentTime = System.currentTimeMillis(); - final List<ChannelSender> putBack = new ArrayList<>(); - - // if a connection hasn't been used with in the threshold then it gets closed - ChannelSender sender; - while ((sender = senderPool.poll()) != null) { - if (currentTime > (sender.getLastUsed() + idleThreshold)) { - getLogger().debug("Closing idle connection..."); - sender.close(); - } else { - putBack.add(sender); - } - } - // re-queue senders that weren't idle, but if the queue is full then close the sender - for (ChannelSender putBackSender : putBack) { - boolean returned = senderPool.offer(putBackSender); - if (!returned) { - putBackSender.close(); - } - } - } - - /** - * Helper for sub-classes to create a sender. - * - * @param protocol the protocol for the sender - * @param host the host to send to - * @param port the port to send to - * @param timeout the timeout for connecting and communicating over the channel - * @param maxSendBufferSize the maximum size of the socket send buffer - * @param sslContext an SSLContext, or null if not using SSL - * - * @return a ChannelSender based on the given properties - * - * @throws IOException if an error occurs creating the sender - */ - protected ChannelSender createSender(final String protocol, - final String host, - final int port, - final int timeout, - final int maxSendBufferSize, - final SSLContext sslContext) throws IOException { - - ChannelSender sender; - if (protocol.equals(UDP_VALUE.getValue())) { - sender = new DatagramChannelSender(host, port, maxSendBufferSize, getLogger()); - } else { - // if an SSLContextService is provided then we make a secure sender - if (sslContext != null) { - sender = new SSLSocketChannelSender(host, port, maxSendBufferSize, sslContext, getLogger()); - } else { - sender = new SocketChannelSender(host, port, maxSendBufferSize, getLogger()); - } - } - - sender.setTimeout(timeout); - sender.open(); - return sender; - } - - /** - * Helper method to acquire an available ChannelSender from the pool. If the pool is empty then the a new sender is created. - * - * @param context - * - the current process context. - * - * @param session - * - the current process session. - * @param flowFile - * - the FlowFile being processed in this session. - * - * @return ChannelSender - the sender that has been acquired or null if no sender is available and a new sender cannot be created. - */ - protected ChannelSender acquireSender(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) { - ChannelSender sender = senderPool.poll(); - if (sender == null) { - try { - getLogger().debug("No available connections, creating a new one..."); - sender = createSender(context); - } catch (IOException e) { - getLogger().error("No available connections, and unable to create a new one, transferring {} to failure", - new Object[]{flowFile}, e); - session.transfer(flowFile, REL_FAILURE); - session.commit(); - context.yield(); - sender = null; - } - } - - return sender; - } - - - /** - * Helper method to relinquish the ChannelSender back to the pool. If the sender is disconnected or the pool is full - * then the sender is closed and discarded. - * - * @param sender the sender to return or close - */ - protected void relinquishSender(final ChannelSender sender) { - if (sender != null) { - // if the connection is still open then then try to return the sender to the pool. - if (sender.isConnected()) { - boolean returned = senderPool.offer(sender); - // if the pool is full then close the sender. - if (!returned) { - sender.close(); - } - } else { - // probably already closed here, but quietly close anyway to be safe. - sender.close(); - } - } - } - - /** - * Represents a range of messages from a FlowFile. - */ - protected static class Range { - private final long start; - private final long end; - - public Range(final long start, final long end) { - this.start = start; - this.end = end; - } - - public long getStart() { - return start; - } - - public long getEnd() { - return end; - } - - @Override - public String toString() { - return "Range[" + start + "-" + end + "]"; - } - } - - /** - * A wrapper to hold the ranges of a FlowFile that were successful and ranges that failed, and then - * transfer those ranges appropriately. - */ - protected class FlowFileMessageBatch { - - private final ProcessSession session; - private final FlowFile flowFile; - private final long startTime = System.nanoTime(); - - private final List<Range> successfulRanges = new ArrayList<>(); - private final List<Range> failedRanges = new ArrayList<>(); - - private Exception lastFailureReason; - private long numMessages = -1L; - private long completeTime = 0L; - private boolean canceled = false; - - public FlowFileMessageBatch(final ProcessSession session, final FlowFile flowFile) { - this.session = session; - this.flowFile = flowFile; - } - - public synchronized void cancelOrComplete() { - if (isComplete()) { - completeSession(); - return; - } - - this.canceled = true; - - session.rollback(); - successfulRanges.clear(); - failedRanges.clear(); - } - - public synchronized void addSuccessfulRange(final long start, final long end) { - if (canceled) { - return; - } - - successfulRanges.add(new Range(start, end)); - - if (isComplete()) { - activeBatches.remove(this); - completeBatches.add(this); - completeTime = System.nanoTime(); - } - } - - public synchronized void addFailedRange(final long start, final long end, final Exception e) { - if (canceled) { - return; - } - - failedRanges.add(new Range(start, end)); - lastFailureReason = e; - - if (isComplete()) { - activeBatches.remove(this); - completeBatches.add(this); - completeTime = System.nanoTime(); - } - } - - private boolean isComplete() { - return !canceled && (numMessages > -1) && (successfulRanges.size() + failedRanges.size() >= numMessages); - } - - public synchronized void setNumMessages(final long msgCount) { - this.numMessages = msgCount; - - if (isComplete()) { - activeBatches.remove(this); - completeBatches.add(this); - completeTime = System.nanoTime(); - } - } - - private void transferRanges(final List<Range> ranges, final Relationship relationship) { - Collections.sort(ranges, new Comparator<Range>() { - @Override - public int compare(final Range o1, final Range o2) { - return Long.compare(o1.getStart(), o2.getStart()); - } - }); - - for (int i = 0; i < ranges.size(); i++) { - Range range = ranges.get(i); - int count = 1; - - while (i + 1 < ranges.size()) { - // Check if the next range in the List continues where this one left off. - final Range nextRange = ranges.get(i + 1); - - if (nextRange.getStart() == range.getEnd()) { - // We have two ranges in a row that are contiguous; combine them into a single Range. - range = new Range(range.getStart(), nextRange.getEnd()); - - count++; - i++; - } else { - break; - } - } - - // Create a FlowFile for this range. - FlowFile child = session.clone(flowFile, range.getStart(), range.getEnd() - range.getStart()); - if (relationship == REL_SUCCESS) { - session.getProvenanceReporter().send(child, transitUri, "Sent " + count + " messages"); - session.transfer(child, relationship); - } else { - child = session.penalize(child); - session.transfer(child, relationship); - } - } - } - - public synchronized void completeSession() { - if (canceled) { - return; - } - - if (successfulRanges.isEmpty() && failedRanges.isEmpty()) { - getLogger().info("Completed processing {} but sent 0 FlowFiles", new Object[] {flowFile}); - session.transfer(flowFile, REL_SUCCESS); - session.commit(); - return; - } - - if (successfulRanges.isEmpty()) { - getLogger().error("Failed to send {}; routing to 'failure'; last failure reason reported was {};", new Object[] {flowFile, lastFailureReason}); - final FlowFile penalizedFlowFile = session.penalize(flowFile); - session.transfer(penalizedFlowFile, REL_FAILURE); - session.commit(); - return; - } - - if (failedRanges.isEmpty()) { - final long transferMillis = TimeUnit.NANOSECONDS.toMillis(completeTime - startTime); - session.getProvenanceReporter().send(flowFile, transitUri, "Sent " + successfulRanges.size() + " messages;", transferMillis); - session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Successfully sent {} messages for {} in {} millis", new Object[] {successfulRanges.size(), flowFile, transferMillis}); - session.commit(); - return; - } - - // At this point, the successful ranges is not empty and the failed ranges is not empty. This indicates that some messages made their way - // successfully and some failed. We will address this by splitting apart the source FlowFile into children and sending the successful messages to 'success' - // and the failed messages to 'failure'. - transferRanges(successfulRanges, REL_SUCCESS); - transferRanges(failedRanges, REL_FAILURE); - session.remove(flowFile); - getLogger().error("Successfully sent {} messages, but failed to send {} messages; the last error received was {}", - new Object[] {successfulRanges.size(), failedRanges.size(), lastFailureReason}); - session.commit(); - } - } - - /** - * Gets the current value of the "Outgoing Message Delimiter" property and parses the special characters. - * - * @param context - * - the current process context. - * @param flowFile - * - the FlowFile being processed. - * - * @return String containing the Delimiter value. - */ - protected String getOutgoingMessageDelimiter(final ProcessContext context, final FlowFile flowFile) { - String delimiter = context.getProperty(OUTGOING_MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue(); - if (delimiter != null) { - delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); - } - return delimiter; - } -}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java deleted file mode 100644 index 278a9ab..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.put.sender; - -import org.apache.nifi.logging.ComponentLog; -import java.io.IOException; -import java.nio.charset.Charset; - -/** - * Base class for sending messages over a channel. - */ -public abstract class ChannelSender { - - protected final int port; - protected final String host; - protected final int maxSendBufferSize; - protected final ComponentLog logger; - - protected volatile int timeout = 10000; - protected volatile long lastUsed; - - public ChannelSender(final String host, final int port, final int maxSendBufferSize, final ComponentLog logger) { - this.port = port; - this.host = host; - this.maxSendBufferSize = maxSendBufferSize; - this.logger = logger; - } - - public void setTimeout(int timeout) { - this.timeout = timeout; - } - - public int getTimeout() { - return timeout; - } - - /** - * @return the last time data was sent over this channel - */ - public long getLastUsed() { - return lastUsed; - } - - /** - * Opens the connection to the destination. - * - * @throws IOException if an error occurred opening the connection. - */ - public abstract void open() throws IOException; - - /** - * Sends the given string over the channel. - * - * @param message the message to send over the channel - * @throws IOException if there was an error communicating over the channel - */ - public void send(final String message, final Charset charset) throws IOException { - final byte[] bytes = message.getBytes(charset); - send(bytes); - } - - /** - * Sends the given data over the channel. - * - * @param data the data to send over the channel - * @throws IOException if there was an error communicating over the channel - */ - public void send(final byte[] data) throws IOException { - try { - write(data); - lastUsed = System.currentTimeMillis(); - } catch (IOException e) { - // failed to send data over the channel, we close it to force - // the creation of a new one next time - close(); - throw e; - } - } - - /** - * Write the given buffer to the underlying channel. - */ - protected abstract void write(byte[] data) throws IOException; - - /** - * @return true if the underlying channel is connected - */ - public abstract boolean isConnected(); - - /** - * Close the underlying channel - */ - public abstract void close(); - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java deleted file mode 100644 index 0b2dfb8..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.put.sender; - -import org.apache.commons.io.IOUtils; -import org.apache.nifi.logging.ComponentLog; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.StandardSocketOptions; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; - -/** - * Sends messages over a DatagramChannel. - */ -public class DatagramChannelSender extends ChannelSender { - - private DatagramChannel channel; - - public DatagramChannelSender(final String host, final int port, final int maxSendBufferSize, final ComponentLog logger) { - super(host, port, maxSendBufferSize, logger); - } - - @Override - public void open() throws IOException { - if (channel == null) { - channel = DatagramChannel.open(); - - if (maxSendBufferSize > 0) { - channel.setOption(StandardSocketOptions.SO_SNDBUF, maxSendBufferSize); - final int actualSendBufSize = channel.getOption(StandardSocketOptions.SO_SNDBUF); - if (actualSendBufSize < maxSendBufferSize) { - logger.warn("Attempted to set Socket Send Buffer Size to " + maxSendBufferSize - + " bytes but could only set to " + actualSendBufSize + "bytes. You may want to " - + "consider changing the Operating System's maximum receive buffer"); - } - } - } - - if (!channel.isConnected()) { - channel.connect(new InetSocketAddress(InetAddress.getByName(host), port)); - } - } - - @Override - protected void write(byte[] data) throws IOException { - ByteBuffer buffer = ByteBuffer.wrap(data); - while (buffer.hasRemaining()) { - channel.write(buffer); - } - } - - @Override - public boolean isConnected() { - return channel != null && channel.isConnected(); - } - - @Override - public void close() { - IOUtils.closeQuietly(channel); - channel = null; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java deleted file mode 100644 index a70c9c5..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.put.sender; - -import org.apache.commons.io.IOUtils; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; - -import javax.net.ssl.SSLContext; -import java.io.IOException; - -/** - * Sends messages over an SSLSocketChannel. - */ -public class SSLSocketChannelSender extends SocketChannelSender { - - private SSLSocketChannel sslChannel; - private SSLContext sslContext; - - public SSLSocketChannelSender(final String host, - final int port, - final int maxSendBufferSize, - final SSLContext sslContext, - final ComponentLog logger) { - super(host, port, maxSendBufferSize, logger); - this.sslContext = sslContext; - } - - @Override - public void open() throws IOException { - if (sslChannel == null) { - super.open(); - sslChannel = new SSLSocketChannel(sslContext, channel, true); - } - sslChannel.setTimeout(timeout); - - // SSLSocketChannel will check if already connected so we can safely call this - sslChannel.connect(); - } - - @Override - protected void write(byte[] data) throws IOException { - sslChannel.write(data); - } - - @Override - public boolean isConnected() { - return sslChannel != null && !sslChannel.isClosed(); - } - - @Override - public void close() { - super.close(); - IOUtils.closeQuietly(sslChannel); - sslChannel = null; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java deleted file mode 100644 index 8d4f875..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.put.sender; - -import org.apache.commons.io.IOUtils; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.remote.io.socket.SocketChannelOutputStream; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; -import java.net.StandardSocketOptions; -import java.nio.channels.SocketChannel; - -/** - * Sends messages over a SocketChannel. - */ -public class SocketChannelSender extends ChannelSender { - - protected SocketChannel channel; - protected SocketChannelOutputStream socketChannelOutput; - - public SocketChannelSender(final String host, final int port, final int maxSendBufferSize, final ComponentLog logger) { - super(host, port, maxSendBufferSize, logger); - } - - @Override - public void open() throws IOException { - if (channel == null) { - channel = SocketChannel.open(); - channel.configureBlocking(false); - - if (maxSendBufferSize > 0) { - channel.setOption(StandardSocketOptions.SO_SNDBUF, maxSendBufferSize); - final int actualSendBufSize = channel.getOption(StandardSocketOptions.SO_SNDBUF); - if (actualSendBufSize < maxSendBufferSize) { - logger.warn("Attempted to set Socket Send Buffer Size to " + maxSendBufferSize - + " bytes but could only set to " + actualSendBufSize + "bytes. You may want to " - + "consider changing the Operating System's maximum send buffer"); - } - } - } - - if (!channel.isConnected()) { - final long startTime = System.currentTimeMillis(); - final InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getByName(host), port); - - if (!channel.connect(socketAddress)) { - while (!channel.finishConnect()) { - if (System.currentTimeMillis() > startTime + timeout) { - throw new SocketTimeoutException("Timed out connecting to " + host + ":" + port); - } - - try { - Thread.sleep(50L); - } catch (final InterruptedException e) { - } - } - } - - socketChannelOutput = new SocketChannelOutputStream(channel); - socketChannelOutput.setTimeout(timeout); - } - } - - @Override - protected void write(byte[] data) throws IOException { - socketChannelOutput.write(data); - } - - @Override - public boolean isConnected() { - return channel != null && channel.isConnected(); - } - - @Override - public void close() { - IOUtils.closeQuietly(socketChannelOutput); - IOUtils.closeQuietly(channel); - socketChannelOutput = null; - channel = null; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java deleted file mode 100644 index bd73379..0000000 --- a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.pattern; - -import org.apache.nifi.processor.exception.ProcessException; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Arrays; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class TestExceptionHandler { - - private static final Logger logger = LoggerFactory.getLogger(TestExceptionHandler.class); - - /** - * Simulate an external procedure. - */ - static class ExternalProcedure { - private boolean available = true; - int divide(Integer a, Integer b) throws Exception { - if (!available) { - throw new IOException("Not available"); - } - if (a == 10) { - throw new IllegalStateException("Service for 10 is not currently available."); - } - return a / b; - } - } - - private class Context { - int count = 0; - } - - @Test - public void testBasicUsage() { - - final ExternalProcedure p = new ExternalProcedure(); - - try { - // Although a catch-exception has to be caught each possible call, - // usually the error handling logic will be the same. - // Ends up having a lot of same code. - final int r1 = p.divide(4, 2); - assertEquals(2, r1); - } catch (Exception e) { - e.printStackTrace(); - } - - final Context context = new Context(); - final ExceptionHandler<Context> handler = new ExceptionHandler<>(); - - // Using handler can avoid the try catch block with reusable error handling logic. - handler.execute(context, 6, i -> { - final int r2 = p.divide(i, 2); - assertEquals(3, r2); - }); - - // If return value is needed, use AtomicReference. - AtomicReference<Integer> r = new AtomicReference<>(); - handler.execute(context, 8, i -> r.set(p.divide(i, 2))); - assertEquals(4, r.get().intValue()); - - // If no exception mapping is specified, any Exception thrown is wrapped by ProcessException. - try { - final Integer nullInput = null; - handler.execute(context, nullInput, i -> r.set(p.divide(i, 2))); - fail("Exception should be thrown because input is null."); - } catch (ProcessException e) { - assertTrue(e.getCause() instanceof NullPointerException); - } - } - - // Reusable Exception mapping function. - static Function<Exception, ErrorTypes> exceptionMapping = i -> { - try { - throw i; - } catch (NullPointerException | ArithmeticException | NumberFormatException e) { - return ErrorTypes.InvalidInput; - } catch (IllegalStateException e) { - return ErrorTypes.TemporalInputFailure; - } catch (IOException e) { - return ErrorTypes.TemporalFailure; - } catch (Exception e) { - throw new ProcessException(e); - } - }; - - @Test - public void testHandling() { - - final ExternalProcedure p = new ExternalProcedure(); - final Context context = new Context(); - - final ExceptionHandler<Context> handler = new ExceptionHandler<>(); - handler.mapException(exceptionMapping); - handler.onError(createInputErrorHandler()); - - // Benefit of handler is being able to externalize error handling, make it simpler. - handler.execute(context, 4, i -> { - final int r = p.divide(i, 2); - assertEquals(2, r); - }); - - // Null pointer exception. - final Integer input = null; - handler.execute(context, input, i -> { - p.divide(i, 2); - fail("Shouldn't reach here."); - }); - - // Divide by zero. - handler.execute(context, 0, i -> { - p.divide(2, i); - fail("Shouldn't reach here."); - }); - - - } - - static <C> ExceptionHandler.OnError<C, Integer> createInputErrorHandler() { - return (c, i, r, e) -> { - switch (r.destination()) { - case ProcessException: - throw new ProcessException(String.format("Execution failed due to %s", e), e); - default: - logger.warn(String.format("Routing to %s: %d caused %s", r, i, e)); - } - }; - } - - static <C> ExceptionHandler.OnError<C, Integer[]> createArrayInputErrorHandler() { - return (c, i, r, e) -> { - switch (r.destination()) { - case ProcessException: - throw new ProcessException(String.format("Execution failed due to %s", e), e); - default: - logger.warn(String.format("Routing to %s: %d, %d caused %s", r, i[0], i[1], e)); - } - }; - } - - @Test - public void testHandlingLoop() { - - final ExternalProcedure p = new ExternalProcedure(); - final Context context = new Context(); - - final ExceptionHandler<Context> handler = new ExceptionHandler<>(); - handler.mapException(exceptionMapping); - handler.onError(createArrayInputErrorHandler()); - - // It's especially handy when looping through inputs. [a, b, expected result] - Integer[][] inputs = new Integer[][]{{4, 2, 2}, {null, 2, 999}, {2, 0, 999}, {10, 2, 999}, {8, 2, 4}}; - - Arrays.stream(inputs).forEach(input -> handler.execute(context, input, (in) -> { - final Integer r = p.divide(in[0], in[1]); - // This is safe because if p.divide throws error, this code won't be executed. - assertEquals(in[2], r); - })); - - AtomicReference<Integer> r = new AtomicReference<>(); - for (Integer[] input : inputs) { - - if (!handler.execute(context, input, (in) -> { - r.set(p.divide(in[0], in[1])); - context.count++; - })){ - // Handler returns false when it fails. - // Cleaner if-exception-continue-next-input can be written cleaner. - continue; - } - - assertEquals(input[2], r.get()); - } - - assertEquals("Successful inputs", 2, context.count); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java deleted file mode 100644 index 6d73759..0000000 --- a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.pattern; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.pattern.TestExceptionHandler.ExternalProcedure; -import org.apache.nifi.util.MockComponentLog; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; - -import static org.apache.nifi.processor.util.pattern.TestExceptionHandler.createArrayInputErrorHandler; -import static org.apache.nifi.processor.util.pattern.TestExceptionHandler.exceptionMapping; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -public class TestRollbackOnFailure { - - private static final Logger logger = LoggerFactory.getLogger(TestRollbackOnFailure.class); - - /** - * This can be an example for how to compose an ExceptionHandler instance by reusable functions. - * @param logger used to log messages within functions - * @return a composed ExceptionHandler - */ - private ExceptionHandler<RollbackOnFailure> getContextAwareExceptionHandler(ComponentLog logger) { - final ExceptionHandler<RollbackOnFailure> handler = new ExceptionHandler<>(); - handler.mapException(exceptionMapping); - handler.adjustError(RollbackOnFailure.createAdjustError(logger)); - handler.onError(createArrayInputErrorHandler()); - return handler; - } - - private void processInputs(RollbackOnFailure context, Integer[][] inputs, List<Integer> results) { - final ExternalProcedure p = new ExternalProcedure(); - final MockComponentLog componentLog = new MockComponentLog("processor-id", this); - final ExceptionHandler<RollbackOnFailure> handler = getContextAwareExceptionHandler(componentLog); - - for (Integer[] input : inputs) { - - if (!handler.execute(context, input, (in) -> { - results.add(p.divide(in[0], in[1])); - context.proceed(); - })){ - continue; - } - - assertEquals(input[2], results.get(results.size() - 1)); - } - } - - @Test - public void testContextDefaultBehavior() { - - // Disabling rollbackOnFailure would route Failure or Retry as they are. - final RollbackOnFailure context = new RollbackOnFailure(false, false); - - Integer[][] inputs = new Integer[][]{{null, 2, 999}, {4, 2, 2}, {2, 0, 999}, {10, 2, 999}, {8, 2, 4}}; - - final List<Integer> results = new ArrayList<>(); - try { - processInputs(context, inputs, results); - } catch (ProcessException e) { - fail("ProcessException should NOT be thrown"); - } - - assertEquals("Successful inputs", 2, context.getProcessedCount()); - } - - @Test - public void testContextRollbackOnFailureNonTransactionalFirstFailure() { - - final RollbackOnFailure context = new RollbackOnFailure(true, false); - - // If the first execution fails without any succeeded inputs, it should throw a ProcessException. - Integer[][] inputs = new Integer[][]{{null, 2, 999}, {4, 2, 2}, {2, 0, 999}, {10, 2, 999}, {8, 2, 4}}; - - final List<Integer> results = new ArrayList<>(); - try { - processInputs(context, inputs, results); - fail("ProcessException should be thrown"); - } catch (ProcessException e) { - logger.info("Exception was thrown as expected."); - } - - assertEquals("Successful inputs", 0, context.getProcessedCount()); - } - - @Test - public void testContextRollbackOnFailureNonTransactionalAlreadySucceeded() { - - final RollbackOnFailure context = new RollbackOnFailure(true, false); - - // If an execution fails after succeeded inputs, it transfer the input to Failure instead of ProcessException, - // and keep going. Because the external system does not support transaction. - Integer[][] inputs = new Integer[][]{{4, 2, 2}, {2, 0, 999}, {null, 2, 999}, {10, 2, 999}, {8, 2, 4}}; - - final List<Integer> results = new ArrayList<>(); - try { - processInputs(context, inputs, results); - } catch (ProcessException e) { - fail("ProcessException should NOT be thrown"); - } - - assertEquals("Successful inputs", 2, context.getProcessedCount()); - } - - @Test - public void testContextRollbackOnFailureTransactionalAlreadySucceeded() { - - final RollbackOnFailure context = new RollbackOnFailure(true, true); - - // Even if an execution fails after succeeded inputs, it transfer the input to Failure, - // because the external system supports transaction. - Integer[][] inputs = new Integer[][]{{4, 2, 2}, {2, 0, 999}, {null, 2, 999}, {10, 2, 999}, {8, 2, 4}}; - - final List<Integer> results = new ArrayList<>(); - try { - processInputs(context, inputs, results); - fail("ProcessException should be thrown"); - } catch (ProcessException e) { - logger.info("Exception was thrown as expected."); - } - - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/pom.xml b/nifi-commons/nifi-record/pom.xml new file mode 100644 index 0000000..57e0e66 --- /dev/null +++ b/nifi-commons/nifi-record/pom.xml @@ -0,0 +1,31 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-commons</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-record</artifactId> + <description> + This module contains the domain model for NiFi's Record abstraction, including + several interfaces for interacting with Records. This module should not depend + on any external libraries. + </description> + +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java new file mode 100644 index 0000000..2fe06f4 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.access; + +public enum SchemaField { + SCHEMA_TEXT("Schema Text"), + SCHEMA_TEXT_FORMAT("Schema Text Format"), + SCHEMA_NAME("Schema Name"), + SCHEMA_IDENTIFIER("Schema Identifier"), + SCHEMA_VERSION("Schema Version"); + + private final String description; + + private SchemaField(final String description) { + this.description = description; + } + + @Override + public String toString() { + return description; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java new file mode 100644 index 0000000..9a064ff --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.access; + +public class SchemaNotFoundException extends Exception { + public SchemaNotFoundException(final String message) { + super(message); + } + + public SchemaNotFoundException(final String message, final Throwable cause) { + super(cause); + } + + public SchemaNotFoundException(final Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/MalformedRecordException.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/MalformedRecordException.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/MalformedRecordException.java new file mode 100644 index 0000000..d45a850 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/MalformedRecordException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization; + +/** + * An Exception that can be thrown to indicate that data was read but could not properly be parsed + */ +public class MalformedRecordException extends Exception { + public MalformedRecordException(final String message) { + super(message); + } + + public MalformedRecordException(final String message, final Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java new file mode 100644 index 0000000..add248e --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; + +/** + * <p> + * A RowRecordReader is responsible for parsing data and returning a record at a time + * in order to allow the caller to iterate over the records individually. + * </p> + * + * <p> + * PLEASE NOTE: This interface is still considered 'unstable' and may change in a non-backward-compatible + * manner between minor or incremental releases of NiFi. + * </p> + */ +public interface RecordReader extends Closeable { + + /** + * Returns the next record in the stream or <code>null</code> if no more records are available. + * + * @return the next record in the stream or <code>null</code> if no more records are available. + * + * @throws IOException if unable to read from the underlying data + * @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record + */ + Record nextRecord() throws IOException, MalformedRecordException; + + /** + * @return a RecordSchema that is appropriate for the records in the stream + * @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse the underlying data + */ + RecordSchema getSchema() throws MalformedRecordException; + + /** + * @return a RecordSet that returns the records in this Record Reader in a streaming fashion + */ + default RecordSet createRecordSet() { + return new RecordSet() { + @Override + public RecordSchema getSchema() throws IOException { + try { + return RecordReader.this.getSchema(); + } catch (final MalformedRecordException mre) { + throw new IOException(mre); + } + } + + @Override + public Record next() throws IOException { + try { + return RecordReader.this.nextRecord(); + } catch (final MalformedRecordException mre) { + throw new IOException(mre); + } + } + }; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java new file mode 100644 index 0000000..7d6fa1c --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.nifi.serialization.record.RecordSet; + +/** + * <p> + * A ResultSetWriter is responsible for writing a ResultSet to a given {@link OutputStream}. + * </p> + * + * <p> + * PLEASE NOTE: This interface is still considered 'unstable' and may change in a non-backward-compatible + * manner between minor or incremental releases of NiFi. + * </p> + */ +public interface RecordSetWriter extends RecordWriter { + /** + * Writes the given result set to the given output stream + * + * @param recordSet the record set to serialize + * @param out the OutputStream to write to + * @return the results of writing the data + * @throws IOException if unable to write to the given OutputStream + */ + WriteResult write(RecordSet recordSet, OutputStream out) throws IOException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java new file mode 100644 index 0000000..aa298d9 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.nifi.serialization.record.Record; + +public interface RecordWriter { + /** + * Writes the given result set to the given output stream + * + * @param record the record set to serialize + * @param out the OutputStream to write to + * @return the results of writing the data + * @throws IOException if unable to write to the given OutputStream + */ + WriteResult write(Record record, OutputStream out) throws IOException; + + /** + * @return the MIME Type that the Result Set Writer produces. This will be added to FlowFiles using + * the mime.type attribute. + */ + String getMimeType(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java new file mode 100644 index 0000000..017aef1 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.stream.Collectors; + +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; + +public class SimpleRecordSchema implements RecordSchema { + private final List<RecordField> fields; + private final Map<String, Integer> fieldIndices; + private final boolean textAvailable; + private final String text; + private final String schemaFormat; + private final SchemaIdentifier schemaIdentifier; + + public SimpleRecordSchema(final List<RecordField> fields) { + this(fields, createText(fields), null, false, SchemaIdentifier.EMPTY); + } + + public SimpleRecordSchema(final List<RecordField> fields, final SchemaIdentifier id) { + this(fields, createText(fields), null, false, id); + } + + public SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final SchemaIdentifier id) { + this(fields, text, schemaFormat, true, id); + } + + private SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final boolean textAvailable, final SchemaIdentifier id) { + this.text = text; + this.schemaFormat = schemaFormat; + this.schemaIdentifier = id; + this.textAvailable = textAvailable; + this.fields = Collections.unmodifiableList(new ArrayList<>(fields)); + this.fieldIndices = new HashMap<>(fields.size()); + + int index = 0; + for (final RecordField field : fields) { + Integer previousValue = fieldIndices.put(field.getFieldName(), index); + if (previousValue != null) { + throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'"); + } + + for (final String alias : field.getAliases()) { + previousValue = fieldIndices.put(alias, index); + if (previousValue != null) { + throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'"); + } + } + + index++; + } + } + + @Override + public Optional<String> getSchemaText() { + if (textAvailable) { + return Optional.ofNullable(text); + } else { + return Optional.empty(); + } + } + + + @Override + public Optional<String> getSchemaFormat() { + return Optional.ofNullable(schemaFormat); + } + + @Override + public List<RecordField> getFields() { + return fields; + } + + @Override + public int getFieldCount() { + return fields.size(); + } + + @Override + public RecordField getField(final int index) { + return fields.get(index); + } + + @Override + public List<DataType> getDataTypes() { + return getFields().stream().map(recordField -> recordField.getDataType()) + .collect(Collectors.toList()); + } + + @Override + public List<String> getFieldNames() { + return getFields().stream().map(recordField -> recordField.getFieldName()) + .collect(Collectors.toList()); + } + + @Override + public Optional<DataType> getDataType(final String fieldName) { + final OptionalInt idx = getFieldIndex(fieldName); + return idx.isPresent() ? Optional.of(fields.get(idx.getAsInt()).getDataType()) : Optional.empty(); + } + + @Override + public Optional<RecordField> getField(final String fieldName) { + final OptionalInt indexOption = getFieldIndex(fieldName); + if (indexOption.isPresent()) { + return Optional.of(fields.get(indexOption.getAsInt())); + } + + return Optional.empty(); + } + + private OptionalInt getFieldIndex(final String fieldName) { + final Integer index = fieldIndices.get(fieldName); + return index == null ? OptionalInt.empty() : OptionalInt.of(index); + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (!(obj instanceof RecordSchema)) { + return false; + } + + final RecordSchema other = (RecordSchema) obj; + return fields.equals(other.getFields()); + } + + @Override + public int hashCode() { + return 143 + 3 * fields.hashCode(); + } + + private static String createText(final List<RecordField> fields) { + final StringBuilder sb = new StringBuilder("["); + + for (int i = 0; i < fields.size(); i++) { + final RecordField field = fields.get(i); + + sb.append("\""); + sb.append(field.getFieldName()); + sb.append("\" : \""); + sb.append(field.getDataType()); + sb.append("\""); + + if (i < fields.size() - 1) { + sb.append(", "); + } + } + sb.append("]"); + return sb.toString(); + } + + @Override + public String toString() { + return text; + } + + @Override + public SchemaIdentifier getIdentifier() { + return schemaIdentifier; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/WriteResult.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/WriteResult.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/WriteResult.java new file mode 100644 index 0000000..3fb2741 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/WriteResult.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization; + +import java.util.Collections; +import java.util.Map; + +/** + * <p> + * Provides information about what was written to an OutputStream by a {@link RecordSetWriter}. + * Instances of WriteResult are typically instantiated by calling the static method {@link WriteResult#of(int, Map)} + * or using {@link WriteResult#EMPTY}. + * </p> + * + * <p> + * PLEASE NOTE: This interface is still considered 'unstable' and may change in a non-backward-compatible + * manner between minor or incremental releases of NiFi. + * </p> + */ +public interface WriteResult { + + /** + * @return the number of records written + */ + int getRecordCount(); + + /** + * @return values that should be added to the FlowFile as attributes + */ + Map<String, String> getAttributes(); + + /** + * Creates a WriteResult with the given record count and attributes + * + * @param recordCount the number of records written + * @param attributes the attributes to add to the FlowFile + * @return A {@link WriteResult} representing the given parameters + */ + public static WriteResult of(final int recordCount, final Map<String, String> attributes) { + return new WriteResult() { + @Override + public int getRecordCount() { + return recordCount; + } + + @Override + public Map<String, String> getAttributes() { + return attributes; + } + }; + } + + public static final WriteResult EMPTY = of(0, Collections.emptyMap()); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/DataType.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/DataType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/DataType.java new file mode 100644 index 0000000..6ed4bd6 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/DataType.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import java.util.Objects; + +public class DataType { + private final RecordFieldType fieldType; + private final String format; + + protected DataType(final RecordFieldType fieldType, final String format) { + this.fieldType = fieldType; + this.format = format; + } + + public String getFormat() { + return format; + } + + public RecordFieldType getFieldType() { + return fieldType; + } + + @Override + public int hashCode() { + return 31 + 41 * getFieldType().hashCode() + 41 * (getFormat() == null ? 0 : getFormat().hashCode()); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof DataType)) { + return false; + } + + final DataType other = (DataType) obj; + return getFieldType().equals(other.getFieldType()) && Objects.equals(getFormat(), other.getFormat()); + } + + @Override + public String toString() { + if (getFormat() == null) { + return getFieldType().toString(); + } else { + return getFieldType().toString() + ":" + getFormat(); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ListRecordSet.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ListRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ListRecordSet.java new file mode 100644 index 0000000..3968f50 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ListRecordSet.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class ListRecordSet implements RecordSet { + private final Iterator<Record> recordItr; + private final RecordSchema schema; + + public ListRecordSet(final RecordSchema schema, final List<Record> records) { + this.schema = schema; + + final List<Record> copy = new ArrayList<>(records); + recordItr = copy.iterator(); + } + + @Override + public RecordSchema getSchema() { + return schema; + } + + @Override + public Record next() { + return recordItr.hasNext() ? recordItr.next() : null; + } +}
