http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java index 1eb5c7e..db76279 100644 --- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java +++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java @@ -23,10 +23,6 @@ import java.nio.channels.SelectionKey; import org.apache.nifi.io.nio.consumer.StreamConsumerFactory; -/** - * - * @author none - */ public final class DatagramChannelReader extends AbstractChannelReader { public static final int MAX_UDP_PACKET_SIZE = 65507; @@ -39,10 +35,10 @@ public final class DatagramChannelReader extends AbstractChannelReader { * Will receive UDP data from channel and won't receive anything unless the * given buffer has enough space for at least one full max udp packet. * - * @param key - * @param buffer - * @return - * @throws IOException + * @param key selection key + * @param buffer to fill + * @return bytes read + * @throws IOException if error filling buffer from channel */ @Override protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java index db2c102..29c2973 100644 --- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java +++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java @@ -22,10 +22,6 @@ import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import org.apache.nifi.io.nio.consumer.StreamConsumerFactory; -/** - * - * @author none - */ public final class SocketChannelReader extends AbstractChannelReader { public SocketChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) { @@ -35,10 +31,10 @@ public final class SocketChannelReader extends AbstractChannelReader { /** * Receives TCP data from the socket channel for the given key. * - * @param key - * @param buffer - * @return - * @throws IOException + * @param key selection key + * @param buffer byte buffer to fill + * @return bytes read + * @throws IOException if error reading bytes */ @Override protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java index d75b7d7..cac8d8b 100644 --- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java +++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java @@ -25,7 +25,6 @@ import org.apache.nifi.io.nio.BufferPool; * thread providing data to process and another thread that is processing that * data. * - * @author none */ public interface StreamConsumer { @@ -36,7 +35,7 @@ public interface StreamConsumer { * associated add to this given queue. If not, buffers will run out and all * stream processing will halt. READ THIS!!! * - * @param returnQueue + * @param returnQueue pool of buffers to use */ void setReturnBufferQueue(BufferPool returnQueue); @@ -45,7 +44,7 @@ public interface StreamConsumer { * data to be processed. If the consumer is finished this should simply * return the given buffer to the return buffer queue (after it is cleared) * - * @param buffer + * @param buffer filled buffer */ void addFilledBuffer(ByteBuffer buffer); @@ -53,7 +52,8 @@ public interface StreamConsumer { * Will be called by the thread that executes the consumption of data. May * be called many times though once <code>isConsumerFinished</code> returns * true this method will likely do nothing. - * @throws java.io.IOException + * + * @throws java.io.IOException if there is an issue processing */ void process() throws IOException; @@ -66,14 +66,14 @@ public interface StreamConsumer { * If true signals the consumer is done consuming data and will not process * any more buffers. * - * @return + * @return true if finished */ boolean isConsumerFinished(); /** * Uniquely identifies the consumer * - * @return + * @return identifier of consumer */ String getId(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java index 7ed5ad4..9c6cb82 100644 --- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java +++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java @@ -81,12 +81,12 @@ public class SSLContextFactory { * * * @return a SSLContext instance - * @throws java.security.KeyStoreException - * @throws java.io.IOException - * @throws java.security.NoSuchAlgorithmException - * @throws java.security.cert.CertificateException - * @throws java.security.UnrecoverableKeyException - * @throws java.security.KeyManagementException + * @throws java.security.KeyStoreException if problem with keystore + * @throws java.io.IOException if unable to create context + * @throws java.security.NoSuchAlgorithmException if algorithm isn't known + * @throws java.security.cert.CertificateException if certificate is invalid + * @throws java.security.UnrecoverableKeyException if the key cannot be recovered + * @throws java.security.KeyManagementException if the key is improper */ public SSLContext createSslContext() throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, UnrecoverableKeyException, KeyManagementException { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java index fc279fb..d6aca92 100644 --- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java +++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java @@ -26,9 +26,6 @@ import java.security.cert.CertificateException; import javax.net.ssl.SSLContext; -/** - * @author unattributed - */ public final class ServerSocketConfiguration { private boolean needClientAuth; @@ -40,7 +37,8 @@ public final class ServerSocketConfiguration { public ServerSocketConfiguration() { } - public SSLContext createSSLContext() throws KeyManagementException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException, FileNotFoundException, IOException { + public SSLContext createSSLContext() + throws KeyManagementException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException, FileNotFoundException, IOException { return sslContextFactory == null ? null : sslContextFactory.createSslContext(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java index c24b540..8b803dc 100644 --- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java +++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java @@ -26,9 +26,6 @@ import java.security.cert.CertificateException; import javax.net.ssl.SSLContext; -/** - * @author unattributed - */ public final class SocketConfiguration { private Integer socketTimeout; @@ -41,7 +38,8 @@ public final class SocketConfiguration { private Integer trafficClass; private SSLContextFactory sslContextFactory; - public SSLContext createSSLContext() throws KeyManagementException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException, FileNotFoundException, IOException { + public SSLContext createSSLContext() + throws KeyManagementException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException, FileNotFoundException, IOException { return sslContextFactory == null ? null : sslContextFactory.createSslContext(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java index fb6a00c..27d676a 100644 --- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java +++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java @@ -35,9 +35,6 @@ import org.apache.nifi.logging.NiFiLog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * @author unattributed - */ public final class SocketUtils { private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(SocketUtils.class)); @@ -99,7 +96,8 @@ public final class SocketUtils { return socket; } - public static ServerSocket createServerSocket(final int port, final ServerSocketConfiguration config) throws IOException, KeyManagementException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, CertificateException { + public static ServerSocket createServerSocket(final int port, final ServerSocketConfiguration config) + throws IOException, KeyManagementException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, CertificateException { if (config == null) { throw new NullPointerException("Configuration may not be null."); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java index e562c25..1ce2ea0 100644 --- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java +++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java @@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory; * then the message is wrapped with a MulticastProtocolMessage before being sent * to the originator. * - * @author unattributed */ public abstract class MulticastListener { @@ -80,8 +79,8 @@ public abstract class MulticastListener { * Implements the action to perform when a new datagram is received. This * class must not close the multicast socket. * - * @param multicastSocket - * @param packet the datagram socket + * @param multicastSocket socket + * @param packet the datagram packet */ public abstract void dispatchRequest(final MulticastSocket multicastSocket, final DatagramPacket packet); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java b/nifi/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java index b3d214e..447d701 100644 --- a/nifi/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java +++ b/nifi/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java @@ -78,7 +78,8 @@ public class TCPClient { for (int i = 0; i < 1000; i++) { sock.getOutputStream().write(bytes); totalBytes += bytes.length; - } sock.getOutputStream().flush(); + } + sock.getOutputStream().flush(); } logger.info("Total bytes sent: " + totalBytes + " to port " + port); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java index 5b20b93..5c8b4c8 100644 --- a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java +++ b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java @@ -72,7 +72,7 @@ import org.slf4j.LoggerFactory; * updates for a given Record at any one time. * </p> * - * @param <T> + * @param <T> type of record this WAL is for */ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepository<T> { @@ -113,14 +113,12 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor * @param paths a sorted set of Paths to use for the partitions/journals and * the snapshot. The snapshot will always be written to the first path * specified. - * * @param partitionCount the number of partitions/journals to use. For best * performance, this should be close to the number of threads that are * expected to update the repository simultaneously - * - * @param serde - * @param syncListener - * @throws IOException + * @param serde the serializer/deserializer for records + * @param syncListener the listener + * @throws IOException if unable to initialize due to IO issue */ @SuppressWarnings("unchecked") public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException { @@ -209,7 +207,9 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor while (true) { final int numBlackListed = numberBlackListedPartitions.get(); if (numBlackListed >= partitions.length) { - throw new IOException("All Partitions have been blacklisted due to failures when attempting to update. If the Write-Ahead Log is able to perform a checkpoint, this issue may resolve itself. Otherwise, manual intervention will be required."); + throw new IOException("All Partitions have been blacklisted due to " + + "failures when attempting to update. If the Write-Ahead Log is able to perform a checkpoint, " + + "this issue may resolve itself. Otherwise, manual intervention will be required."); } final long partitionIdx = partitionIndex.getAndIncrement(); @@ -248,7 +248,9 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor } else if (updateType == UpdateType.SWAP_OUT) { final String newLocation = serde.getLocation(record); if (newLocation == null) { - logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_OUT but no indicator of where the Record is to be Swapped Out to; these records may be lost when the repository is restored!"); + logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_OUT but " + + "no indicator of where the Record is to be Swapped Out to; these records may be " + + "lost when the repository is restored!"); } else { recordMap.remove(recordIdentifier); this.externalLocations.add(newLocation); @@ -256,7 +258,9 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor } else if (updateType == UpdateType.SWAP_IN) { final String newLocation = serde.getLocation(record); if (newLocation == null) { - logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_IN but no indicator of where the Record is to be Swapped In from; these records may be duplicated when the repository is restored!"); + logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_IN but no " + + "indicator of where the Record is to be Swapped In from; these records may be duplicated " + + "when the repository is restored!"); } else { externalLocations.remove(newLocation); } @@ -345,11 +349,13 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor final int waliImplementationVersion = dataIn.readInt(); if (!waliImplementationClass.equals(MinimalLockingWriteAheadLog.class.getName())) { - throw new IOException("Write-Ahead Log located at " + snapshotPath + " was written using the " + waliImplementationClass + " class; cannot restore using " + getClass().getName()); + throw new IOException("Write-Ahead Log located at " + snapshotPath + " was written using the " + + waliImplementationClass + " class; cannot restore using " + getClass().getName()); } if (waliImplementationVersion > getVersion()) { - throw new IOException("Write-Ahead Log located at " + snapshotPath + " was written using version " + waliImplementationVersion + " of the " + waliImplementationClass + " class; cannot restore using Version " + getVersion()); + throw new IOException("Write-Ahead Log located at " + snapshotPath + " was written using version " + + waliImplementationVersion + " of the " + waliImplementationClass + " class; cannot restore using Version " + getVersion()); } dataIn.readUTF(); // ignore serde class name for now @@ -380,7 +386,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor } this.recoveredExternalLocations.addAll(swapLocations); - logger.debug("{} restored {} Records and {} Swap Files from Snapshot, ending with Transaction ID {}", new Object[]{this, numRecords, recoveredExternalLocations.size(), maxTransactionId}); + logger.debug("{} restored {} Records and {} Swap Files from Snapshot, ending with Transaction ID {}", + new Object[]{this, numRecords, recoveredExternalLocations.size(), maxTransactionId}); return maxTransactionId; } } @@ -390,10 +397,9 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor * if recovery of a Partition requires the Write-Ahead Log be checkpointed * before modification. * - * @param modifiableRecordMap - * @param maxTransactionIdRestored - * @return - * @throws IOException + * @param modifiableRecordMap map + * @param maxTransactionIdRestored index of max restored transaction + * @throws IOException if unable to recover from edits */ private void recoverFromEdits(final Map<Object, T> modifiableRecordMap, final Long maxTransactionIdRestored) throws IOException { final Map<Object, T> updateMap = new HashMap<>(); @@ -422,7 +428,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor try { partition.recoverNextTransaction(ignorableMap, updateMap, ignorableSwapLocations); } catch (final EOFException e) { - logger.error("{} unexpectedly reached End of File while reading from {} for Transaction {}; assuming crash and ignoring this transaction.", + logger.error("{} unexpectedly reached End of File while reading from {} for Transaction {}; " + + "assuming crash and ignoring this transaction.", new Object[]{this, partition, transactionId}); } } @@ -442,7 +449,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor modifiableRecordMap.remove(id); } } catch (final EOFException e) { - logger.error("{} unexpectedly reached End-of-File when reading from {} for Transaction ID {}; assuming crash and ignoring this transaction", + logger.error("{} unexpectedly reached End-of-File when reading from {} for Transaction ID {}; " + + "assuming crash and ignoring this transaction", new Object[]{this, nextPartition, firstTransactionId}); } @@ -452,7 +460,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor try { subsequentTransactionId = nextPartition.getNextRecoverableTransactionId(); } catch (final IOException e) { - logger.error("{} unexpectedly found End-of-File when reading from {} for Transaction ID {}; assuming crash and ignoring this transaction", + logger.error("{} unexpectedly found End-of-File when reading from {} for Transaction ID {}; " + + "assuming crash and ignoring this transaction", new Object[]{this, nextPartition, firstTransactionId}); } @@ -576,7 +585,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor final long partitionMillis = TimeUnit.MILLISECONDS.convert(partitionEnd - partitionStart, TimeUnit.NANOSECONDS); final long stopTheWorldMillis = TimeUnit.NANOSECONDS.toMillis(stopTheWorldNanos); - logger.info("{} checkpointed with {} Records and {} Swap Files in {} milliseconds (Stop-the-world time = {} milliseconds, Clear Edit Logs time = {} millis), max Transaction ID {}", + logger.info("{} checkpointed with {} Records and {} Swap Files in {} milliseconds (Stop-the-world " + + "time = {} milliseconds, Clear Edit Logs time = {} millis), max Transaction ID {}", new Object[]{this, records.size(), swapLocations.size(), millis, stopTheWorldMillis, partitionMillis, maxTransactionId}); return records.size(); @@ -605,9 +615,9 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor * * All methods with the exceptions of {@link #claim()}, {@link #tryClaim()}, * and {@link #releaseClaim()} in this Partition MUST be called while - * holding the claim (via {@link #claim} or {@link #tryClaim()). + * holding the claim (via {@link #claim} or {@link #tryClaim()}). * - * @param <S> + * @param <S> type of record held in the partitions */ private static class Partition<S> { @@ -703,7 +713,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor * Closes resources pointing to the current journal and begins writing * to a new one * - * @throws IOException + * @throws IOException if failure to rollover */ public void rollover() throws IOException { lock.lock(); @@ -777,7 +787,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor if (isJournalFile(file)) { paths.add(file.toPath()); } else { - logger.warn("Found file {}, but could not access it, or it was not in the expected format; will ignore this file", file.getAbsolutePath()); + logger.warn("Found file {}, but could not access it, or it was not in the expected format; " + + "will ignore this file", file.getAbsolutePath()); } } @@ -836,7 +847,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor return true; } - public void update(final Collection<S> records, final long transactionId, final Map<Object, S> recordMap, final boolean forceSync) throws IOException { + public void update(final Collection<S> records, final long transactionId, final Map<Object, S> recordMap, final boolean forceSync) + throws IOException { if (this.closed) { throw new IllegalStateException("Partition is closed"); } @@ -889,7 +901,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor final long waliVersion = recoveryIn.readInt(); if (waliVersion > writeAheadLogVersion) { - throw new IOException("Cannot recovery from file " + nextRecoveryPath + " because it was written using WALI version " + waliVersion + ", but the version used to restore it is only " + writeAheadLogVersion); + throw new IOException("Cannot recovery from file " + nextRecoveryPath + " because it was written using " + + "WALI version " + waliVersion + ", but the version used to restore it is only " + writeAheadLogVersion); } @SuppressWarnings("unused") @@ -936,7 +949,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor final Path nextRecoveryPath = this.recoveryFiles.poll(); if (nextRecoveryPath != null) { - throw new IllegalStateException("Signaled to end recovery, but there are more recovery files for Partition in directory " + editDirectory); + throw new IllegalStateException("Signaled to end recovery, but there are more recovery files for Partition " + + "in directory " + editDirectory); } final Path newEditPath = getNewEditPath(); @@ -999,7 +1013,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor /** * Must be called after recovery has finished * - * @return + * @return max recovered transaction id */ public long getMaxRecoveredTransactionId() { return maxTransactionId.get(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java index bbc7efb..cc984a6 100644 --- a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java +++ b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java @@ -35,10 +35,10 @@ public interface SerDe<T> { * {@link DataOutputStream}. * </p> * - * @param previousRecordState - * @param newRecordState - * @param out - * @throws IOException + * @param previousRecordState previous state + * @param newRecordState new state + * @param out stream to write to + * @throws IOException if fail during write */ void serializeEdit(T previousRecordState, T newRecordState, DataOutputStream out) throws IOException; @@ -48,9 +48,9 @@ public interface SerDe<T> { * {@link DataOutputStream}. * </p> * - * @param record - * @param out - * @throws IOException + * @param record to serialize + * @param out to write to + * @throws IOException if failed to write */ void serializeRecord(T record, DataOutputStream out) throws IOException; @@ -63,13 +63,13 @@ public interface SerDe<T> { * This method must never return <code>null</code>. * </p> * - * @param in + * @param in to deserialize from * @param currentRecordStates an unmodifiable map of Record ID's to the * current state of that record * @param version the version of the SerDe that was used to serialize the * edit record - * @return - * @throws IOException + * @return deserialized record + * @throws IOException if failure reading */ T deserializeEdit(DataInputStream in, Map<Object, T> currentRecordStates, int version) throws IOException; @@ -79,27 +79,27 @@ public interface SerDe<T> { * record. If no data is available, returns <code>null</code>. * </p> * - * @param in + * @param in stream to read from * @param version the version of the SerDe that was used to serialize the * record - * @return - * @throws IOException + * @return record + * @throws IOException failure reading */ T deserializeRecord(DataInputStream in, int version) throws IOException; /** * Returns the unique ID for the given record * - * @param record - * @return + * @param record to obtain identifier for + * @return identifier of record */ Object getRecordIdentifier(T record); /** * Returns the UpdateType for the given record * - * @param record - * @return + * @param record to retrieve update type for + * @return update type */ UpdateType getUpdateType(T record); @@ -112,8 +112,8 @@ public interface SerDe<T> { * WALI with a record of type {@link UpdateType#CREATE} that indicates a * Location of file://tmp/external1 * - * @param record - * @return + * @param record to get location of + * @return location */ String getLocation(T record); @@ -122,7 +122,7 @@ public interface SerDe<T> { * when serializing/deserializing the edit logs so that if the version * changes, we are still able to deserialize old versions * - * @return + * @return version */ int getVersion(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java index 4567872..7f0e828 100644 --- a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java +++ b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java @@ -48,7 +48,7 @@ import java.util.Set; * one partition or may allow many partitions. * </p> * - * @param <T> + * @param <T> the type of Record this repository is for */ public interface WriteAheadRepository<T> { @@ -63,7 +63,7 @@ public interface WriteAheadRepository<T> { * to be flushed to disk. If false, the data may be stored in Operating * System buffers, which improves performance but could cause loss of data * if power is lost or the Operating System crashes - * @throws IOException + * @throws IOException if failure to update repo * @throws IllegalArgumentException if multiple records within the given * Collection have the same ID, as specified by {@link Record#getId()} * method @@ -78,8 +78,8 @@ public interface WriteAheadRepository<T> { * before any updates are issued to the Repository. * </p> * - * @return - * @throws IOException + * @return recovered records + * @throws IOException if failure to read from repo * @throws IllegalStateException if any updates have been issued against * this Repository before this method is invoked */ @@ -92,8 +92,8 @@ public interface WriteAheadRepository<T> { * BEFORE {@link update}. * </p> * - * @return - * @throws IOException + * @return swap location + * @throws IOException if failure reading swap locations */ Set<String> getRecoveredSwapLocations() throws IOException; @@ -107,7 +107,7 @@ public interface WriteAheadRepository<T> { * * * @return the number of records that were written to the new snapshot - * @throws java.io.IOException + * @throws java.io.IOException if failure during checkpoint */ int checkpoint() throws IOException; @@ -116,7 +116,7 @@ public interface WriteAheadRepository<T> { * Causes the repository to checkpoint and then close any open resources. * </p> * - * @throws IOException + * @throws IOException if failure to shutdown cleanly */ void shutdown() throws IOException; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java index fd3c2de..bff1d62 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java @@ -29,7 +29,7 @@ public class MockControllerServiceInitializationContext extends MockControllerSe public MockControllerServiceInitializationContext(final ControllerService controllerService, final String identifier) { this(controllerService, identifier, new MockProcessorLog(identifier, controllerService)); } - + public MockControllerServiceInitializationContext(final ControllerService controllerService, final String identifier, final ComponentLog logger) { this.identifier = identifier; this.logger = logger; @@ -40,17 +40,17 @@ public class MockControllerServiceInitializationContext extends MockControllerSe public String getIdentifier() { return identifier; } - + @Override public String getControllerServiceName(final String serviceIdentifier) { - return null; + return null; } @Override public ControllerServiceLookup getControllerServiceLookup() { return this; } - + @Override public ComponentLog getLogger() { return logger; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java index 2734440..219ee24 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java @@ -40,17 +40,17 @@ public abstract class MockControllerServiceLookup implements ControllerServiceLo public void removeControllerService(ControllerService service) { final ControllerService canonical = getControllerService(service.getIdentifier()); - if ( canonical == null || canonical != service ) { + if (canonical == null || canonical != service) { throw new IllegalArgumentException("Controller Service " + service + " is not known"); } - + controllerServiceMap.remove(service.getIdentifier()); } protected void addControllerServices(final MockControllerServiceLookup other) { this.controllerServiceMap.putAll(other.controllerServiceMap); } - + protected ControllerServiceConfiguration getConfiguration(final String identifier) { return controllerServiceMap.get(identifier); } @@ -80,7 +80,7 @@ public abstract class MockControllerServiceLookup implements ControllerServiceLo public boolean isControllerServiceEnabling(final String serviceIdentifier) { return false; } - + @Override public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) { final Set<String> ids = new HashSet<>(); @@ -91,10 +91,10 @@ public abstract class MockControllerServiceLookup implements ControllerServiceLo } return ids; } - + @Override public String getControllerServiceName(String serviceIdentifier) { - final ControllerServiceConfiguration status = controllerServiceMap.get(serviceIdentifier); - return status == null ? null : serviceIdentifier; + final ControllerServiceConfiguration status = controllerServiceMap.get(serviceIdentifier); + return status == null ? null : serviceIdentifier; } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java index 1be0293..e9fb9d6 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java @@ -185,8 +185,8 @@ public class MockFlowFile implements FlowFile { * Asserts that the content of this FlowFile is the same as the content of * the given file * - * @param file - * @throws IOException + * @param file to compare content against + * @throws IOException if fails doing IO during comparison */ public void assertContentEquals(final File file) throws IOException { assertContentEquals(file.toPath()); @@ -196,8 +196,8 @@ public class MockFlowFile implements FlowFile { * Asserts that the content of this FlowFile is the same as the content of * the given path * - * @param path - * @throws IOException + * @param path where to find content to compare to + * @throws IOException if io error occurs while comparing content */ public void assertContentEquals(final Path path) throws IOException { try (final InputStream in = Files.newInputStream(path, StandardOpenOption.READ)) { @@ -209,8 +209,8 @@ public class MockFlowFile implements FlowFile { * Asserts that the content of this FlowFile is the same as the content of * the given byte array * - * @param data - * @throws IOException + * @param data the data to compare + * @throws IOException if any ioe occurs while reading flowfile */ public void assertContentEquals(final byte[] data) throws IOException { try (final InputStream in = new ByteArrayInputStream(data)) { @@ -236,8 +236,8 @@ public class MockFlowFile implements FlowFile { * the given InputStream. This method closes the InputStream when it is * finished. * - * @param in - * @throws IOException + * @param in the stream to source comparison data from + * @throws IOException if any issues reading from given source */ public void assertContentEquals(final InputStream in) throws IOException { int bytesRead = 0; @@ -264,9 +264,7 @@ public class MockFlowFile implements FlowFile { } /** - * Returns a copy of the the contents of the FlowFile as a byte array - * - * @return + * @return a copy of the the contents of the FlowFile as a byte array */ public byte[] toByteArray() { return Arrays.copyOf(this.data, this.data.length); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java index 6536928..20a2f7c 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java @@ -54,7 +54,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements S /** * Creates a new MockProcessContext for the given Processor * - * @param component + * @param component being mocked */ public MockProcessContext(final ConfigurableComponent component) { this.component = Objects.requireNonNull(component); @@ -73,7 +73,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements S // do nothing...the service is being loaded } } - + @Override public PropertyValue getProperty(final PropertyDescriptor descriptor) { return getProperty(descriptor.getName()); @@ -107,9 +107,9 @@ public class MockProcessContext extends MockControllerServiceLookup implements S * either case, the ValidationResult is returned, indicating whether or not * the property is valid * - * @param descriptor - * @param value - * @return + * @param descriptor of property to modify + * @param value new value + * @return result */ public ValidationResult setProperty(final PropertyDescriptor descriptor, final String value) { requireNonNull(descriptor); @@ -154,7 +154,6 @@ public class MockProcessContext extends MockControllerServiceLookup implements S config.setProperties(properties); config.setAnnotationData(annotationData); } - @Override public int getMaxConcurrentTasks() { @@ -268,10 +267,10 @@ public class MockProcessContext extends MockControllerServiceLookup implements S } public Set<Relationship> getAvailableRelationships() { - if ( !(component instanceof Processor) ) { + if (!(component instanceof Processor)) { return Collections.emptySet(); } - + final Set<Relationship> relationships = new HashSet<>(((Processor) component).getRelationships()); relationships.removeAll(unavailableRelationships); return relationships; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index 83c75c6..e9bb778 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -329,7 +329,6 @@ public class MockProcessSession implements ProcessSession { return newFlowFile; } - @Override public MockFlowFile merge(final Collection<FlowFile> sources, final FlowFile destination) { for (final FlowFile source : sources) { @@ -676,11 +675,9 @@ public class MockProcessSession implements ProcessSession { } /** - * Returns a List of FlowFiles in the order in which they were transferred + * @param relationship to get flowfiles for + * @return a List of FlowFiles in the order in which they were transferred * to the given relationship - * - * @param relationship - * @return */ public List<MockFlowFile> getFlowFilesForRelationship(final String relationship) { final Relationship procRel = new Relationship.Builder().name(relationship).build(); @@ -783,7 +780,7 @@ public class MockProcessSession implements ProcessSession { * will capture the uuid of a certain number of source objects and may not * capture all of them. How many it will capture is unspecified. * - * @param sources + * @param sources to inherit common attributes from */ private FlowFile inheritAttributes(final Collection<FlowFile> sources, final FlowFile destination) { final StringBuilder parentUuidBuilder = new StringBuilder(); @@ -883,8 +880,8 @@ public class MockProcessSession implements ProcessSession { * Assert that the number of FlowFiles transferred to the given relationship * is equal to the given count * - * @param relationship - * @param count + * @param relationship to validate transfer count of + * @param count items transfer to given relationship */ public void assertTransferCount(final Relationship relationship, final int count) { final int transferCount = getFlowFilesForRelationship(relationship).size(); @@ -896,8 +893,8 @@ public class MockProcessSession implements ProcessSession { * Assert that the number of FlowFiles transferred to the given relationship * is equal to the given count * - * @param relationship - * @param count + * @param relationship to validate transfer count of + * @param count items transfer to given relationship */ public void assertTransferCount(final String relationship, final int count) { assertTransferCount(new Relationship.Builder().name(relationship).build(), count); @@ -921,7 +918,7 @@ public class MockProcessSession implements ProcessSession { * Asserts that all FlowFiles that were transferred were transferred to the * given relationship * - * @param relationship + * @param relationship to check for transferred flow files */ public void assertAllFlowFilesTransferred(final String relationship) { assertAllFlowFilesTransferred(new Relationship.Builder().name(relationship).build()); @@ -931,7 +928,7 @@ public class MockProcessSession implements ProcessSession { * Asserts that all FlowFiles that were transferred were transferred to the * given relationship * - * @param relationship + * @param relationship to validate */ public void assertAllFlowFilesTransferred(final Relationship relationship) { for (final Map.Entry<Relationship, List<MockFlowFile>> entry : transferMap.entrySet()) { @@ -956,8 +953,8 @@ public class MockProcessSession implements ProcessSession { * given relationship and that the number of FlowFiles transferred is equal * to <code>count</code> * - * @param relationship - * @param count + * @param relationship to validate + * @param count number of items sent to that relationship (expected) */ public void assertAllFlowFilesTransferred(final Relationship relationship, final int count) { assertAllFlowFilesTransferred(relationship); @@ -969,17 +966,15 @@ public class MockProcessSession implements ProcessSession { * given relationship and that the number of FlowFiles transferred is equal * to <code>count</code> * - * @param relationship - * @param count + * @param relationship to validate + * @param count number of items sent to that relationship (expected) */ public void assertAllFlowFilesTransferred(final String relationship, final int count) { assertAllFlowFilesTransferred(new Relationship.Builder().name(relationship).build(), count); } /** - * Returns the number of FlowFiles that were removed - * - * @return + * @return the number of FlowFiles that were removed */ public int getRemovedCount() { return removedCount; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java index 0aa2749..2e5d3eb 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java @@ -64,9 +64,9 @@ public class MockProcessorInitializationContext implements ProcessorInitializati @Override public String getControllerServiceName(String serviceIdentifier) { - return context.getControllerServiceName(serviceIdentifier); + return context.getControllerServiceName(serviceIdentifier); } - + @Override public boolean isControllerServiceEnabled(String serviceIdentifier) { return context.isControllerServiceEnabled(serviceIdentifier); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorLog.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorLog.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorLog.java index 5505e88..837784b 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorLog.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorLog.java @@ -66,21 +66,11 @@ public class MockProcessorLog implements ProcessorLog { return (os != null && os.length > 0 && (os[os.length - 1] instanceof Throwable)); } - /** - * - * @param msg - * @param t - */ @Override public void warn(final String msg, final Throwable t) { warn("{} " + msg, new Object[]{component}, t); } - /** - * - * @param msg - * @param os - */ @Override public void warn(String msg, Object[] os) { if (lastArgIsException(os)) { @@ -92,12 +82,6 @@ public class MockProcessorLog implements ProcessorLog { } } - /** - * - * @param msg - * @param os - * @param t - */ @Override public void warn(String msg, Object[] os, final Throwable t) { os = addProcessorAndThrowable(os, t); @@ -109,21 +93,12 @@ public class MockProcessorLog implements ProcessorLog { } } - /** - * - * @param msg - */ @Override public void warn(String msg) { msg = "{} " + msg; logger.warn(msg, component); } - /** - * - * @param msg - * @param t - */ @Override public void trace(String msg, Throwable t) { msg = "{} " + msg; @@ -131,11 +106,6 @@ public class MockProcessorLog implements ProcessorLog { logger.trace(msg, os, t); } - /** - * - * @param msg - * @param os - */ @Override public void trace(String msg, Object[] os) { msg = "{} " + msg; @@ -143,10 +113,6 @@ public class MockProcessorLog implements ProcessorLog { logger.trace(msg, os); } - /** - * - * @param msg - */ @Override public void trace(String msg) { msg = "{} " + msg; @@ -154,12 +120,6 @@ public class MockProcessorLog implements ProcessorLog { logger.trace(msg, os); } - /** - * - * @param msg - * @param os - * @param t - */ @Override public void trace(String msg, Object[] os, Throwable t) { os = addProcessorAndThrowable(os, t); @@ -169,56 +129,31 @@ public class MockProcessorLog implements ProcessorLog { logger.trace("", t); } - /** - * - * @return - */ @Override public boolean isWarnEnabled() { return logger.isWarnEnabled(); } - /** - * - * @return - */ @Override public boolean isTraceEnabled() { return logger.isTraceEnabled(); } - /** - * - * @return - */ @Override public boolean isInfoEnabled() { return logger.isInfoEnabled(); } - /** - * - * @return - */ @Override public boolean isErrorEnabled() { return logger.isErrorEnabled(); } - /** - * - * @return - */ @Override public boolean isDebugEnabled() { return logger.isDebugEnabled(); } - /** - * - * @param msg - * @param t - */ @Override public void info(String msg, Throwable t) { msg = "{} " + msg; @@ -230,11 +165,6 @@ public class MockProcessorLog implements ProcessorLog { } } - /** - * - * @param msg - * @param os - */ @Override public void info(String msg, Object[] os) { msg = "{} " + msg; @@ -243,10 +173,6 @@ public class MockProcessorLog implements ProcessorLog { logger.info(msg, os); } - /** - * - * @param msg - */ @Override public void info(String msg) { msg = "{} " + msg; @@ -255,12 +181,6 @@ public class MockProcessorLog implements ProcessorLog { logger.info(msg, os); } - /** - * - * @param msg - * @param os - * @param t - */ @Override public void info(String msg, Object[] os, Throwable t) { os = addProcessorAndThrowable(os, t); @@ -272,20 +192,11 @@ public class MockProcessorLog implements ProcessorLog { } } - /** - * - * @return - */ @Override public String getName() { return logger.getName(); } - /** - * - * @param msg - * @param t - */ @Override public void error(String msg, Throwable t) { msg = "{} " + msg; @@ -297,11 +208,6 @@ public class MockProcessorLog implements ProcessorLog { } } - /** - * - * @param msg - * @param os - */ @Override public void error(String msg, Object[] os) { if (lastArgIsException(os)) { @@ -313,10 +219,6 @@ public class MockProcessorLog implements ProcessorLog { } } - /** - * - * @param msg - */ @Override public void error(String msg) { msg = "{} " + msg; @@ -325,12 +227,6 @@ public class MockProcessorLog implements ProcessorLog { logger.error(msg, os); } - /** - * - * @param msg - * @param os - * @param t - */ @Override public void error(String msg, Object[] os, Throwable t) { os = addProcessorAndThrowable(os, t); @@ -342,11 +238,6 @@ public class MockProcessorLog implements ProcessorLog { } } - /** - * - * @param msg - * @param t - */ @Override public void debug(String msg, Throwable t) { msg = "{} " + msg; @@ -355,11 +246,6 @@ public class MockProcessorLog implements ProcessorLog { logger.debug(msg, os, t); } - /** - * - * @param msg - * @param os - */ @Override public void debug(String msg, Object[] os) { os = addProcessor(os); @@ -368,12 +254,6 @@ public class MockProcessorLog implements ProcessorLog { logger.debug(msg, os); } - /** - * - * @param msg - * @param os - * @param t - */ @Override public void debug(String msg, Object[] os, Throwable t) { os = addProcessorAndThrowable(os, t); @@ -385,10 +265,6 @@ public class MockProcessorLog implements ProcessorLog { } } - /** - * - * @param msg - */ @Override public void debug(String msg) { msg = "{} " + msg; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java index 3451f12..097eafd 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java @@ -88,7 +88,7 @@ public class MockProvenanceReporter implements ProvenanceReporter { public void fork(FlowFile parent, java.util.Collection<FlowFile> children, String details, long forkDuration) { } - + @Override public void join(Collection<FlowFile> parents, FlowFile child) { @@ -108,7 +108,7 @@ public class MockProvenanceReporter implements ProvenanceReporter { public void join(java.util.Collection<FlowFile> parents, FlowFile child, String details, long joinDuration) { } - + @Override public void clone(FlowFile parent, FlowFile child) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java index ca4350c..63a9876 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java @@ -100,11 +100,9 @@ public class MockReportingContext extends MockControllerServiceLookup implements } /** - * Returns all Bulletins that have been created for the component with the + * @param componentId identifier of component to get bulletins for + * @return all Bulletins that have been created for the component with the * given ID - * - * @param componentId - * @return */ public List<Bulletin> getComponentBulletins(final String componentId) { final List<Bulletin> created = componentBulletinsCreated.get(componentId); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java index 7cabef2..0aea00a 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java @@ -81,7 +81,7 @@ public class MockReportingInitializationContext extends MockControllerServiceLoo public SchedulingStrategy getSchedulingStrategy() { return SchedulingStrategy.TIMER_DRIVEN; } - + @Override public ComponentLog getLogger() { return logger; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java index c9b1cda..d73a09b 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java @@ -38,10 +38,10 @@ public class MockValidationContext implements ValidationContext, ControllerServi public MockValidationContext(final MockProcessContext processContext) { this.context = processContext; - + final Map<PropertyDescriptor, String> properties = processContext.getProperties(); expressionLanguageSupported = new HashMap<>(properties.size()); - for ( final PropertyDescriptor descriptor : properties.keySet() ) { + for (final PropertyDescriptor descriptor : properties.keySet()) { expressionLanguageSupported.put(descriptor.getName(), descriptor.isExpressionLanguageSupported()); } } @@ -101,13 +101,13 @@ public class MockValidationContext implements ValidationContext, ControllerServi public boolean isControllerServiceEnabled(final ControllerService service) { return context.isControllerServiceEnabled(service); } - + @Override public String getControllerServiceName(final String serviceIdentifier) { - final ControllerServiceConfiguration configuration = context.getConfiguration(serviceIdentifier); - return configuration == null ? null : serviceIdentifier; + final ControllerServiceConfiguration configuration = context.getConfiguration(serviceIdentifier); + return configuration == null ? null : serviceIdentifier; } - + @Override public boolean isValidationRequired(final ControllerService service) { return true; @@ -117,16 +117,16 @@ public class MockValidationContext implements ValidationContext, ControllerServi public boolean isControllerServiceEnabling(String serviceIdentifier) { return context.isControllerServiceEnabling(serviceIdentifier); } - + public boolean isExpressionLanguagePresent(final String value) { - if ( value == null ) { + if (value == null) { return false; } - + final List<Range> elRanges = Query.extractExpressionRanges(value); return (elRanges != null && !elRanges.isEmpty()); } - + @Override public boolean isExpressionLanguageSupported(final String propertyName) { final Boolean supported = expressionLanguageSupported.get(propertyName); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/ReflectionUtils.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/ReflectionUtils.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/ReflectionUtils.java index 9d52eb3..940eeea 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/ReflectionUtils.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/ReflectionUtils.java @@ -34,14 +34,15 @@ public class ReflectionUtils { * are supplied by the <code>args</code> parameter than needed, the extra * arguments will be ignored. * - * @param annotation - * @param instance - * @param args - * @throws InvocationTargetException - * @throws IllegalArgumentException - * @throws IllegalAccessException + * @param annotation the annotation to look for + * @param instance to invoke a method of + * @param args to supply in a method call + * @throws InvocationTargetException ite + * @throws IllegalArgumentException iae + * @throws IllegalAccessException if not allowed to invoke that method */ - public static void invokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final Object... args) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { + public static void invokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final Object... args) + throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { for (final Method method : instance.getClass().getMethods()) { if (method.isAnnotationPresent(annotation)) { final boolean isAccessible = method.isAccessible(); @@ -90,9 +91,9 @@ public class ReflectionUtils { * are supplied by the <code>args</code> parameter than needed, the extra * arguments will be ignored. * - * @param annotation - * @param instance - * @param args + * @param annotation the annotation to look for + * @param instance to invoke a method of + * @param args to supply in a method call * @return <code>true</code> if all appropriate methods were invoked and * returned without throwing an Exception, <code>false</code> if one of the * methods threw an Exception or could not be invoked; if <code>false</code> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java index 13a87de..65d79a6 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java @@ -32,7 +32,6 @@ public class SharedSessionState { private final AtomicLong flowFileIdGenerator; private final ConcurrentMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>(); - public SharedSessionState(final Processor processor, final AtomicLong flowFileIdGenerator) { flowFileQueue = new MockFlowFileQueue(); provenanceReporter = new MockProvenanceReporter(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index d66ed81..7048cfe 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -86,14 +86,14 @@ public class StandardProcessorTestRunner implements TestRunner { private static final Logger logger = LoggerFactory.getLogger(StandardProcessorTestRunner.class); private static final Set<Class<? extends Annotation>> deprecatedTypeAnnotations = new HashSet<>(); private static final Set<Class<? extends Annotation>> deprecatedMethodAnnotations = new HashSet<>(); - + static { // do this in a separate method, just so that we can add a @SuppressWarnings annotation // because we want to indicate explicitly that we know that we are using deprecated // classes here. populateDeprecatedMethods(); } - + StandardProcessorTestRunner(final Processor processor) { this.processor = processor; this.idGenerator = new AtomicLong(0L); @@ -103,7 +103,7 @@ public class StandardProcessorTestRunner implements TestRunner { this.context = new MockProcessContext(processor); detectDeprecatedAnnotations(processor); - + final MockProcessorInitializationContext mockInitContext = new MockProcessorInitializationContext(processor, context); processor.initialize(mockInitContext); @@ -126,7 +126,7 @@ public class StandardProcessorTestRunner implements TestRunner { deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.TriggerWhenEmpty.class); deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable.class); deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.TriggerSerially.class); - + deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnRemoved.class); deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnAdded.class); deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnScheduled.class); @@ -134,24 +134,24 @@ public class StandardProcessorTestRunner implements TestRunner { deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnStopped.class); deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnUnscheduled.class); } - + private static void detectDeprecatedAnnotations(final Processor processor) { - for ( final Class<? extends Annotation> annotationClass : deprecatedTypeAnnotations ) { - if ( processor.getClass().isAnnotationPresent(annotationClass) ) { + for (final Class<? extends Annotation> annotationClass : deprecatedTypeAnnotations) { + if (processor.getClass().isAnnotationPresent(annotationClass)) { Assert.fail("Processor is using deprecated Annotation " + annotationClass.getCanonicalName()); } } - - for ( final Class<? extends Annotation> annotationClass : deprecatedMethodAnnotations ) { - for ( final Method method : processor.getClass().getMethods() ) { - if ( method.isAnnotationPresent(annotationClass) ) { + + for (final Class<? extends Annotation> annotationClass : deprecatedMethodAnnotations) { + for (final Method method : processor.getClass().getMethods()) { + if (method.isAnnotationPresent(annotationClass)) { Assert.fail("Processor is using deprecated Annotation " + annotationClass.getCanonicalName() + " for method " + method); } } } - + } - + @Override public void setValidateExpressionUsage(final boolean validate) { context.setValidateExpressionUsage(validate); @@ -181,7 +181,7 @@ public class StandardProcessorTestRunner implements TestRunner { public void run(final int iterations, final boolean stopOnFinish) { run(iterations, stopOnFinish, true); } - + @Override public void run(final int iterations, final boolean stopOnFinish, final boolean initialize) { if (iterations < 1) { @@ -191,7 +191,7 @@ public class StandardProcessorTestRunner implements TestRunner { context.assertValid(); context.enableExpressionValidation(); try { - if ( initialize ) { + if (initialize) { try { ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, context); } catch (Exception e) { @@ -519,7 +519,7 @@ public class StandardProcessorTestRunner implements TestRunner { // Assert.fail("Controller Service " + service + " is using deprecated Annotation " + org.apache.nifi.controller.annotation.OnConfigured.class + " for method " + method); // } // } - + final ComponentLog logger = new MockProcessorLog(identifier, service); final MockControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(requireNonNull(service), requireNonNull(identifier), logger); service.initialize(initContext); @@ -538,66 +538,64 @@ public class StandardProcessorTestRunner implements TestRunner { context.addControllerService(identifier, service, resolvedProps, null); } - @Override public void assertNotValid(final ControllerService service) { final ValidationContext validationContext = new MockValidationContext(context).getControllerServiceValidationContext(service); final Collection<ValidationResult> results = context.getControllerService(service.getIdentifier()).validate(validationContext); - - for ( final ValidationResult result : results ) { - if ( !result.isValid() ) { + + for (final ValidationResult result : results) { + if (!result.isValid()) { return; } } - + Assert.fail("Expected Controller Service " + service + " to be invalid but it is valid"); } - + @Override public void assertValid(final ControllerService service) { final ValidationContext validationContext = new MockValidationContext(context).getControllerServiceValidationContext(service); final Collection<ValidationResult> results = context.getControllerService(service.getIdentifier()).validate(validationContext); - - for ( final ValidationResult result : results ) { - if ( !result.isValid() ) { + + for (final ValidationResult result : results) { + if (!result.isValid()) { Assert.fail("Expected Controller Service to be valid but it is invalid due to: " + result.toString()); } } } - - + @Override public void disableControllerService(final ControllerService service) { final ControllerServiceConfiguration configuration = context.getConfiguration(service.getIdentifier()); - if ( configuration == null ) { + if (configuration == null) { throw new IllegalArgumentException("Controller Service " + service + " is not known"); } - - if ( !configuration.isEnabled() ) { + + if (!configuration.isEnabled()) { throw new IllegalStateException("Controller service " + service + " cannot be disabled because it is not enabled"); } - + try { ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service); } catch (final Exception e) { e.printStackTrace(); Assert.fail("Failed to disable Controller Service " + service + " due to " + e); } - + configuration.setEnabled(false); } - + @Override public void enableControllerService(final ControllerService service) { final ControllerServiceConfiguration configuration = context.getConfiguration(service.getIdentifier()); - if ( configuration == null ) { + if (configuration == null) { throw new IllegalArgumentException("Controller Service " + service + " is not known"); } - - if ( configuration.isEnabled() ) { + + if (configuration.isEnabled()) { throw new IllegalStateException("Cannot enable Controller Service " + service + " because it is not disabled"); } - + try { final ConfigurationContext configContext = new MockConfigurationContext(configuration.getProperties(), context); ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service, configContext); @@ -609,87 +607,86 @@ public class StandardProcessorTestRunner implements TestRunner { Assert.fail("Failed to enable Controller Service " + service + " due to " + e); } - configuration.setEnabled(true); + configuration.setEnabled(true); } - + @Override public boolean isControllerServiceEnabled(final ControllerService service) { final ControllerServiceConfiguration configuration = context.getConfiguration(service.getIdentifier()); - if ( configuration == null ) { + if (configuration == null) { throw new IllegalArgumentException("Controller Service " + service + " is not known"); } return configuration.isEnabled(); } - + @Override public void removeControllerService(final ControllerService service) { disableControllerService(service); - + try { ReflectionUtils.invokeMethodsWithAnnotation(OnRemoved.class, service); } catch (final Exception e) { e.printStackTrace(); Assert.fail("Failed to remove Controller Service " + service + " due to " + e); } - + context.removeControllerService(service); } - + @Override public void setAnnotationData(final ControllerService service, final String annotationData) { final ControllerServiceConfiguration configuration = getConfigToUpdate(service); configuration.setAnnotationData(annotationData); } - + private ControllerServiceConfiguration getConfigToUpdate(final ControllerService service) { final ControllerServiceConfiguration configuration = context.getConfiguration(service.getIdentifier()); - if ( configuration == null ) { + if (configuration == null) { throw new IllegalArgumentException("Controller Service " + service + " is not known"); } - - if ( configuration.isEnabled() ) { + + if (configuration.isEnabled()) { throw new IllegalStateException("Controller service " + service + " cannot be modified because it is not disabled"); } - + return configuration; } - + @Override public ValidationResult setProperty(final ControllerService service, final PropertyDescriptor property, final AllowableValue value) { return setProperty(service, property, value.getValue()); } - + @Override public ValidationResult setProperty(final ControllerService service, final PropertyDescriptor property, final String value) { final ControllerServiceConfiguration configuration = getConfigToUpdate(service); final Map<PropertyDescriptor, String> curProps = configuration.getProperties(); final Map<PropertyDescriptor, String> updatedProps = new HashMap<>(curProps); - + final ValidationContext validationContext = new MockValidationContext(context).getControllerServiceValidationContext(service); final ValidationResult validationResult = property.validate(value, validationContext); - + updatedProps.put(property, value); configuration.setProperties(updatedProps); - + return validationResult; } - + @Override public ValidationResult setProperty(final ControllerService service, final String propertyName, final String value) { final PropertyDescriptor descriptor = service.getPropertyDescriptor(propertyName); - if ( descriptor == null ) { + if (descriptor == null) { return new ValidationResult.Builder() - .input(propertyName) - .explanation(propertyName + " is not a known Property for Controller Service " + service) - .subject("Invalid property") - .valid(false) - .build(); + .input(propertyName) + .explanation(propertyName + " is not a known Property for Controller Service " + service) + .subject("Invalid property") + .valid(false) + .build(); } return setProperty(service, descriptor, value); } - - + @Override public ControllerService getControllerService(final String identifier) { return context.getControllerService(identifier);
