[hotfix] [nifi] Minor style cleanups in NiFi source

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b08b64ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b08b64ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b08b64ab

Branch: refs/heads/master
Commit: b08b64abdb7c9bd7946e9c36e63ec368a1ac5032
Parents: 38362c4
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jun 7 19:23:56 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 8 15:17:10 2016 +0200

----------------------------------------------------------------------
 .../streaming/connectors/nifi/NiFiSource.java   | 99 ++++++++++----------
 1 file changed, 49 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b08b64ab/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
 
b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
index 00b6921..57c59ec 100644
--- 
a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
+++ 
b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.nifi;
 
 import org.apache.flink.api.common.functions.StoppableFunction;
@@ -26,6 +27,7 @@ import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.client.SiteToSiteClientConfig;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.stream.io.StreamUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,13 +42,20 @@ import java.util.Map;
  */
 public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> 
implements StoppableFunction{
 
+       private static final long serialVersionUID = 1L;
+
        private static final Logger LOG = 
LoggerFactory.getLogger(NiFiSource.class);
 
        private static final long DEFAULT_WAIT_TIME_MS = 1000;
 
-       private long waitTimeMs;
-       private SiteToSiteClient client;
-       private SiteToSiteClientConfig clientConfig;
+       // 
------------------------------------------------------------------------
+
+       private final SiteToSiteClientConfig clientConfig;
+
+       private final long waitTimeMs;
+
+       private transient SiteToSiteClient client;
+
        private volatile boolean isRunning = true;
 
        /**
@@ -73,63 +82,58 @@ public class NiFiSource extends 
RichParallelSourceFunction<NiFiDataPacket> imple
        public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                client = new 
SiteToSiteClient.Builder().fromConfig(clientConfig).build();
-               isRunning = true;
        }
 
        @Override
        public void run(SourceContext<NiFiDataPacket> ctx) throws Exception {
-               try {
-                       while (isRunning) {
-                               final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
-                               if (transaction == null) {
-                                       LOG.warn("A transaction could not be 
created, waiting and will try again...");
-                                       try {
-                                               Thread.sleep(waitTimeMs);
-                                       } catch (InterruptedException ignored) {
-
-                                       }
-                                       continue;
+               while (isRunning) {
+                       final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
+                       if (transaction == null) {
+                               LOG.warn("A transaction could not be created, 
waiting and will try again...");
+                               try {
+                                       Thread.sleep(waitTimeMs);
+                               } catch (InterruptedException ignored) {
+
                                }
+                               continue;
+                       }
 
-                               DataPacket dataPacket = transaction.receive();
-                               if (dataPacket == null) {
-                                       transaction.confirm();
-                                       transaction.complete();
+                       DataPacket dataPacket = transaction.receive();
+                       if (dataPacket == null) {
+                               transaction.confirm();
+                               transaction.complete();
 
-                                       LOG.debug("No data available to pull, 
waiting and will try again...");
-                                       try {
-                                               Thread.sleep(waitTimeMs);
-                                       } catch (InterruptedException ignored) {
+                               LOG.debug("No data available to pull, waiting 
and will try again...");
+                               try {
+                                       Thread.sleep(waitTimeMs);
+                               } catch (InterruptedException ignored) {
 
-                                       }
-                                       continue;
                                }
+                               continue;
+                       }
 
-                               final List<NiFiDataPacket> niFiDataPackets = 
new ArrayList<>();
-                               do {
-                                       // Read the data into a byte array and 
wrap it along with the attributes
-                                       // into a NiFiDataPacket.
-                                       final InputStream inStream = 
dataPacket.getData();
-                                       final byte[] data = new byte[(int) 
dataPacket.getSize()];
-                                       StreamUtils.fillBuffer(inStream, data);
-
-                                       final Map<String, String> attributes = 
dataPacket.getAttributes();
+                       final List<NiFiDataPacket> niFiDataPackets = new 
ArrayList<>();
+                       do {
+                               // Read the data into a byte array and wrap it 
along with the attributes
+                               // into a NiFiDataPacket.
+                               final InputStream inStream = 
dataPacket.getData();
+                               final byte[] data = new byte[(int) 
dataPacket.getSize()];
+                               StreamUtils.fillBuffer(inStream, data);
 
-                                       niFiDataPackets.add(new 
StandardNiFiDataPacket(data, attributes));
-                                       dataPacket = transaction.receive();
-                               } while (dataPacket != null);
+                               final Map<String, String> attributes = 
dataPacket.getAttributes();
 
-                               // Confirm transaction to verify the data
-                               transaction.confirm();
+                               niFiDataPackets.add(new 
StandardNiFiDataPacket(data, attributes));
+                               dataPacket = transaction.receive();
+                       } while (dataPacket != null);
 
-                               for (NiFiDataPacket dp : niFiDataPackets) {
-                                       ctx.collect(dp);
-                               }
+                       // Confirm transaction to verify the data
+                       transaction.confirm();
 
-                               transaction.complete();
+                       for (NiFiDataPacket dp : niFiDataPackets) {
+                               ctx.collect(dp);
                        }
-               } finally {
-                       ctx.close();
+
+                       transaction.complete();
                }
        }
 
@@ -144,11 +148,6 @@ public class NiFiSource extends 
RichParallelSourceFunction<NiFiDataPacket> imple
                client.close();
        }
 
- /**
-       * {@inheritDoc}
-       * <p>
-       * Sets the {@link #isRunning} flag to {@code false}.
-       */
        @Override
        public void stop() {
                this.isRunning = false;

Reply via email to