http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
new file mode 100644
index 0000000..65e6d4e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import java.nio.ByteBuffer;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Factory for different kinds of fake Kinesis behaviours using the {@link 
KinesisProxyInterface} interface.
+ */
+public class FakeKinesisBehavioursFactory {
+
+       // 
------------------------------------------------------------------------
+       //  Behaviours related to shard listing and resharding, used in 
KinesisDataFetcherTest
+       // 
------------------------------------------------------------------------
+
+       public static KinesisProxyInterface 
noShardsFoundForRequestedStreamsBehaviour() {
+
+               return new KinesisProxyInterface() {
+                       @Override
+                       public GetShardListResult getShardList(Map<String, 
String> streamNamesWithLastSeenShardIds) {
+                               return new GetShardListResult(); // not setting 
any retrieved shards for result
+                       }
+
+                       @Override
+                       public String getShardIterator(KinesisStreamShard 
shard, String shardIteratorType, String startingSeqNum) {
+                               return null;
+                       }
+
+                       @Override
+                       public GetRecordsResult getRecords(String 
shardIterator, int maxRecordsToGet) {
+                               return null;
+                       }
+               };
+
+       }
+
+       public static KinesisProxyInterface 
nonReshardedStreamsBehaviour(Map<String,Integer> streamsToShardCount) {
+               return new NonReshardedStreamsKinesis(streamsToShardCount);
+
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Behaviours related to fetching records, used mainly in 
ShardConsumerTest
+       // 
------------------------------------------------------------------------
+
+       public static KinesisProxyInterface 
totalNumOfRecordsAfterNumOfGetRecordsCalls(final int numOfRecords, final int 
numOfGetRecordsCalls) {
+               return new 
SingleShardEmittingFixNumOfRecordsKinesis(numOfRecords, numOfGetRecordsCalls);
+       }
+       
+       public static KinesisProxyInterface 
totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(
+               final int numOfRecords, final int numOfGetRecordsCall, final 
int orderOfCallToExpire) {
+               return new 
SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(
+                       numOfRecords, numOfGetRecordsCall, orderOfCallToExpire);
+       }
+
+       public static class 
SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis extends 
SingleShardEmittingFixNumOfRecordsKinesis {
+
+               private boolean expiredOnceAlready = false;
+               private boolean expiredIteratorRefreshed = false;
+               private final int orderOfCallToExpire;
+
+               public 
SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(final int 
numOfRecords,
+                                                                               
                                                                        final 
int numOfGetRecordsCalls,
+                                                                               
                                                                        final 
int orderOfCallToExpire) {
+                       super(numOfRecords, numOfGetRecordsCalls);
+                       checkArgument(orderOfCallToExpire <= 
numOfGetRecordsCalls,
+                               "can not test unexpected expired iterator if 
orderOfCallToExpire is larger than numOfGetRecordsCalls");
+                       this.orderOfCallToExpire = orderOfCallToExpire;
+               }
+
+               @Override
+               public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) {
+                       if ((Integer.valueOf(shardIterator) == 
orderOfCallToExpire-1) && !expiredOnceAlready) {
+                               // we fake only once the expired iterator 
exception at the specified get records attempt order
+                               expiredOnceAlready = true;
+                               throw new ExpiredIteratorException("Artificial 
expired shard iterator");
+                       } else if (expiredOnceAlready && 
!expiredIteratorRefreshed) {
+                               // if we've thrown the expired iterator 
exception already, but the iterator was not refreshed,
+                               // throw a hard exception to the test that is 
testing this Kinesis behaviour
+                               throw new RuntimeException("expired shard 
iterator was not refreshed on the next getRecords() call");
+                       } else {
+                               // assuming that the maxRecordsToGet is always 
large enough
+                               return new GetRecordsResult()
+                                       
.withRecords(shardItrToRecordBatch.get(shardIterator))
+                                       .withNextShardIterator(
+                                               (Integer.valueOf(shardIterator) 
== totalNumOfGetRecordsCalls - 1)
+                                                       ? null : 
String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard 
iterator is null
+                       }
+               }
+
+               @Override
+               public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, String startingSeqNum) {
+                       if (!expiredOnceAlready) {
+                               // for the first call, just return the iterator 
of the first batch of records
+                               return "0";
+                       } else {
+                               // fake the iterator refresh when this is 
called again after getRecords throws expired iterator
+                               // exception on the orderOfCallToExpire attempt
+                               expiredIteratorRefreshed = true;
+                               return String.valueOf(orderOfCallToExpire-1);
+                       }
+               }
+       }
+
+       private static class SingleShardEmittingFixNumOfRecordsKinesis 
implements KinesisProxyInterface {
+
+               protected final int totalNumOfGetRecordsCalls;
+
+               protected final int totalNumOfRecords;
+
+               protected final Map<String,List<Record>> shardItrToRecordBatch;
+
+               public SingleShardEmittingFixNumOfRecordsKinesis(final int 
numOfRecords, final int numOfGetRecordsCalls) {
+                       this.totalNumOfRecords = numOfRecords;
+                       this.totalNumOfGetRecordsCalls = numOfGetRecordsCalls;
+
+                       // initialize the record batches that we will be fetched
+                       this.shardItrToRecordBatch = new HashMap<>();
+
+                       int numOfAlreadyPartitionedRecords = 0;
+                       int numOfRecordsPerBatch = 
numOfRecords/numOfGetRecordsCalls + 1;
+                       for (int batch=0; batch<totalNumOfGetRecordsCalls; 
batch++) {
+                               if (batch != totalNumOfGetRecordsCalls-1) {
+                                       shardItrToRecordBatch.put(
+                                               String.valueOf(batch),
+                                               createRecordBatchWithRange(
+                                                       
numOfAlreadyPartitionedRecords,
+                                                       
numOfAlreadyPartitionedRecords + numOfRecordsPerBatch));
+                                       numOfAlreadyPartitionedRecords += 
numOfRecordsPerBatch;
+                               } else {
+                                       shardItrToRecordBatch.put(
+                                               String.valueOf(batch),
+                                               createRecordBatchWithRange(
+                                                       
numOfAlreadyPartitionedRecords,
+                                                       totalNumOfRecords));
+                               }
+                       }
+               }
+
+               @Override
+               public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) {
+                       // assuming that the maxRecordsToGet is always large 
enough
+                       return new GetRecordsResult()
+                               
.withRecords(shardItrToRecordBatch.get(shardIterator))
+                               .withNextShardIterator(
+                                       (Integer.valueOf(shardIterator) == 
totalNumOfGetRecordsCalls-1)
+                                               ? null : 
String.valueOf(Integer.valueOf(shardIterator)+1)); // last next shard iterator 
is null
+               }
+
+               @Override
+               public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, String startingSeqNum) {
+                       // this will be called only one time per ShardConsumer;
+                       // so, simply return the iterator of the first batch of 
records
+                       return "0";
+               }
+
+               @Override
+               public GetShardListResult getShardList(Map<String, String> 
streamNamesWithLastSeenShardIds) {
+                       return null;
+               }
+
+               public static List<Record> createRecordBatchWithRange(int min, 
int max) {
+                       List<Record> batch = new LinkedList<>();
+                       for (int i = min; i < max; i++) {
+                               batch.add(
+                                       new Record()
+                                               
.withData(ByteBuffer.wrap(String.valueOf(i).getBytes()))
+                                               
.withPartitionKey(UUID.randomUUID().toString())
+                                               
.withApproximateArrivalTimestamp(new Date(System.currentTimeMillis()))
+                                               
.withSequenceNumber(String.valueOf(i)));
+                       }
+                       return batch;
+               }
+
+       }
+
+       private static class NonReshardedStreamsKinesis implements 
KinesisProxyInterface {
+
+               private Map<String, List<KinesisStreamShard>> 
streamsWithListOfShards = new HashMap<>();
+
+               public NonReshardedStreamsKinesis(Map<String,Integer> 
streamsToShardCount) {
+                       for (Map.Entry<String,Integer> streamToShardCount : 
streamsToShardCount.entrySet()) {
+                               String streamName = streamToShardCount.getKey();
+                               int shardCount = streamToShardCount.getValue();
+
+                               if (shardCount == 0) {
+                                       // don't do anything
+                               } else {
+                                       List<KinesisStreamShard> shardsOfStream 
= new ArrayList<>(shardCount);
+                                       for (int i=0; i < shardCount; i++) {
+                                               shardsOfStream.add(
+                                                       new KinesisStreamShard(
+                                                               streamName,
+                                                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i))));
+                                       }
+                                       streamsWithListOfShards.put(streamName, 
shardsOfStream);
+                               }
+                       }
+               }
+
+               @Override
+               public GetShardListResult getShardList(Map<String, String> 
streamNamesWithLastSeenShardIds) {
+                       GetShardListResult result = new GetShardListResult();
+                       for (Map.Entry<String, List<KinesisStreamShard>> 
streamsWithShards : streamsWithListOfShards.entrySet()) {
+                               String streamName = streamsWithShards.getKey();
+                               for (KinesisStreamShard shard : 
streamsWithShards.getValue()) {
+                                       if 
(streamNamesWithLastSeenShardIds.get(streamName) == null) {
+                                               
result.addRetrievedShardToStream(streamName, shard);
+                                       } else {
+                                               if 
(KinesisStreamShard.compareShardIds(
+                                                       
shard.getShard().getShardId(), streamNamesWithLastSeenShardIds.get(streamName)) 
> 0) {
+                                                       
result.addRetrievedShardToStream(streamName, shard);
+                                               }
+                                       }
+                               }
+                       }
+                       return result;
+               }
+
+               @Override
+               public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, String startingSeqNum) {
+                       return null;
+               }
+
+               @Override
+               public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) {
+                       return null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
new file mode 100644
index 0000000..fdfdfe1
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A thread that runs a topology with a manual data generator as source, and 
the FlinkKinesisProducer as sink.
+ */
+public class KinesisEventsGeneratorProducerThread {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KinesisEventsGeneratorProducerThread.class);
+
+       public static Thread create(final int totalEventCount,
+                                                               final int 
parallelism,
+                                                               final String 
awsAccessKey,
+                                                               final String 
awsSecretKey,
+                                                               final String 
awsRegion,
+                                                               final String 
kinesisStreamName,
+                                                               final 
AtomicReference<Throwable> errorHandler,
+                                                               final int 
flinkPort,
+                                                               final 
Configuration flinkConfig) {
+               Runnable kinesisEventsGeneratorProducer = new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       StreamExecutionEnvironment see = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort, 
flinkConfig);
+                                       see.setParallelism(parallelism);
+
+                                       // start data generator
+                                       DataStream<String> simpleStringStream = 
see.addSource(new 
KinesisEventsGeneratorProducerThread.EventsGenerator(totalEventCount)).setParallelism(1);
+
+                                       Properties producerProps = new 
Properties();
+                                       
producerProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey);
+                                       
producerProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
awsSecretKey);
+                                       
producerProps.setProperty(AWSConfigConstants.AWS_REGION, awsRegion);
+
+                                       FlinkKinesisProducer<String> kinesis = 
new FlinkKinesisProducer<>(new SimpleStringSchema(),
+                                               producerProps);
+
+                                       kinesis.setFailOnError(true);
+                                       
kinesis.setDefaultStream(kinesisStreamName);
+                                       kinesis.setDefaultPartition("0");
+                                       simpleStringStream.addSink(kinesis);
+
+                                       LOG.info("Starting producing topology");
+                                       see.execute("Producing topology");
+                                       LOG.info("Producing topo finished");
+                               } catch (Exception e) {
+                                       LOG.warn("Error while running producing 
topology", e);
+                                       errorHandler.set(e);
+                               }
+                       }
+               };
+
+               return new Thread(kinesisEventsGeneratorProducer);
+       }
+
+       private static class EventsGenerator implements SourceFunction<String> {
+
+               private static final Logger LOG = 
LoggerFactory.getLogger(EventsGenerator.class);
+
+               private boolean running = true;
+               private final long limit;
+
+               public EventsGenerator(long limit) {
+                       this.limit = limit;
+               }
+
+               @Override
+               public void run(SourceContext<String> ctx) throws Exception {
+                       long seq = 0;
+                       while(running) {
+                               Thread.sleep(10);
+                               String evt = (seq++) + "-" + 
RandomStringUtils.randomAlphabetic(12);
+                               ctx.collect(evt);
+                               LOG.info("Emitting event {}", evt);
+                               if(seq >= limit) {
+                                       break;
+                               }
+                       }
+                       ctx.close();
+                       LOG.info("Stopping events generator");
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java
new file mode 100644
index 0000000..c8dd347
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+public class KinesisShardIdGenerator {
+       // Kinesis shards ids are in the form of: ^shardId-\d{12}
+       public static String generateFromShardOrder(int order) {
+               return String.format("shardId-%012d", order);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
new file mode 100644
index 0000000..80ad06c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Properties;
+
+public class TestableFlinkKinesisConsumer extends FlinkKinesisConsumer<String> 
{
+
+       private final RuntimeContext mockedRuntimeCtx;
+
+       public TestableFlinkKinesisConsumer(String fakeStream,
+                                                                               
Properties fakeConfiguration,
+                                                                               
final int totalNumOfConsumerSubtasks,
+                                                                               
final int indexOfThisConsumerSubtask) {
+               super(fakeStream, new SimpleStringSchema(), fakeConfiguration);
+
+               this.mockedRuntimeCtx = Mockito.mock(RuntimeContext.class);
+
+               
Mockito.when(mockedRuntimeCtx.getNumberOfParallelSubtasks()).thenAnswer(new 
Answer<Integer>() {
+                       @Override
+                       public Integer answer(InvocationOnMock 
invocationOnMock) throws Throwable {
+                               return totalNumOfConsumerSubtasks;
+                       }
+               });
+
+               
Mockito.when(mockedRuntimeCtx.getIndexOfThisSubtask()).thenAnswer(new 
Answer<Integer>() {
+                       @Override
+                       public Integer answer(InvocationOnMock 
invocationOnMock) throws Throwable {
+                               return indexOfThisConsumerSubtask;
+                       }
+               });
+       }
+
+       @Override
+       public RuntimeContext getRuntimeContext() {
+               return this.mockedRuntimeCtx;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
new file mode 100644
index 0000000..57886fe
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TestableKinesisDataFetcher extends KinesisDataFetcher<String> {
+
+       private static final Object fakeCheckpointLock = new Object();
+
+       private long numElementsCollected;
+
+       public TestableKinesisDataFetcher(List<String> fakeStreams,
+                                                                         
Properties fakeConfiguration,
+                                                                         int 
fakeTotalCountOfSubtasks,
+                                                                         int 
fakeTndexOfThisSubtask,
+                                                                         
AtomicReference<Throwable> thrownErrorUnderTest,
+                                                                         
LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest,
+                                                                         
HashMap<String, String> subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
+                                                                         
KinesisProxyInterface fakeKinesis) {
+               super(fakeStreams,
+                       getMockedSourceContext(),
+                       fakeCheckpointLock,
+                       getMockedRuntimeContext(fakeTotalCountOfSubtasks, 
fakeTndexOfThisSubtask),
+                       fakeConfiguration,
+                       new KinesisDeserializationSchemaWrapper<>(new 
SimpleStringSchema()),
+                       thrownErrorUnderTest,
+                       subscribedShardsStateUnderTest,
+                       subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
+                       fakeKinesis);
+
+               this.numElementsCollected = 0;
+       }
+
+       public long getNumOfElementsCollected() {
+               return numElementsCollected;
+       }
+
+       @Override
+       protected KinesisDeserializationSchema<String> 
getClonedDeserializationSchema() {
+               return new KinesisDeserializationSchemaWrapper<>(new 
SimpleStringSchema());
+       }
+
+       @Override
+       protected void emitRecordAndUpdateState(String record, long 
recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
+               synchronized (fakeCheckpointLock) {
+                       this.numElementsCollected++;
+                       updateState(shardStateIndex, lastSequenceNumber);
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       private static SourceFunction.SourceContext<String> 
getMockedSourceContext() {
+               return Mockito.mock(SourceFunction.SourceContext.class);
+       }
+
+       private static RuntimeContext getMockedRuntimeContext(final int 
fakeTotalCountOfSubtasks, final int fakeTndexOfThisSubtask) {
+               RuntimeContext mockedRuntimeContext = 
Mockito.mock(RuntimeContext.class);
+
+               
Mockito.when(mockedRuntimeContext.getNumberOfParallelSubtasks()).thenAnswer(new 
Answer<Integer>() {
+                       @Override
+                       public Integer answer(InvocationOnMock 
invocationOnMock) throws Throwable {
+                               return fakeTotalCountOfSubtasks;
+                       }
+               });
+
+               
Mockito.when(mockedRuntimeContext.getIndexOfThisSubtask()).thenAnswer(new 
Answer<Integer>() {
+                       @Override
+                       public Integer answer(InvocationOnMock 
invocationOnMock) throws Throwable {
+                               return fakeTndexOfThisSubtask;
+                       }
+               });
+
+               Mockito.when(mockedRuntimeContext.getTaskName()).thenAnswer(new 
Answer<String>() {
+                       @Override
+                       public String answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                               return "Fake Task";
+                       }
+               });
+
+               
Mockito.when(mockedRuntimeContext.getTaskNameWithSubtasks()).thenAnswer(new 
Answer<String>() {
+                       @Override
+                       public String answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                               return "Fake Task (" + fakeTndexOfThisSubtask + 
"/" + fakeTotalCountOfSubtasks + ")";
+                       }
+               });
+
+               return mockedRuntimeContext;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-nifi/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-nifi/pom.xml 
b/flink-connectors/flink-connector-nifi/pom.xml
new file mode 100644
index 0000000..e04a63a
--- /dev/null
+++ b/flink-connectors/flink-connector-nifi/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.2-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-nifi_2.10</artifactId>
+       <name>flink-connector-nifi</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <nifi.version>0.6.1</nifi.version>
+       </properties>
+
+       <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-site-to-site-client</artifactId>
+            <version>${nifi.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.10</artifactId>
+            <version>${project.version}</version>
+                       <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.10</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-tests_2.10</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_2.10</artifactId>
+            <version>${project.version}</version>
+                       <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <configuration>
+                                       
<rerunFailingTestsCount>3</rerunFailingTestsCount>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
 
b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
new file mode 100644
index 0000000..c8ceb57
--- /dev/null
+++ 
b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import java.util.Map;
+
+/**
+ * <p>
+ * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps 
both
+ * a FlowFile's content and its attributes so that they can be processed by 
Flink.
+ * </p>
+ */
+public interface NiFiDataPacket {
+
+       /**
+        * @return the contents of a NiFi FlowFile
+        */
+       byte[] getContent();
+
+       /**
+        * @return a Map of attributes that are associated with the NiFi 
FlowFile
+        */
+       Map<String, String> getAttributes();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
 
b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
new file mode 100644
index 0000000..9bb521b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.io.Serializable;
+
+/**
+ * A function that can create a NiFiDataPacket from an incoming instance of 
the given type.
+ *
+ * @param <T>
+ */
+public interface NiFiDataPacketBuilder<T> extends Function, Serializable {
+
+       NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
 
b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
new file mode 100644
index 0000000..abc6b35
--- /dev/null
+++ 
b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+/**
+ * A sink that delivers data to Apache NiFi using the NiFi Site-to-Site 
client. The sink requires
+ * a NiFiDataPacketBuilder which can create instances of NiFiDataPacket from 
the incoming data.
+ */
+public class NiFiSink<T> extends RichSinkFunction<T> {
+
+       private SiteToSiteClient client;
+       private SiteToSiteClientConfig clientConfig;
+       private NiFiDataPacketBuilder<T> builder;
+
+       /**
+        * Construct a new NiFiSink with the given client config and 
NiFiDataPacketBuilder.
+        *
+        * @param clientConfig the configuration for building a NiFi 
SiteToSiteClient
+        * @param builder a builder to produce NiFiDataPackets from incoming 
data
+        */
+       public NiFiSink(SiteToSiteClientConfig clientConfig, 
NiFiDataPacketBuilder<T> builder) {
+               this.clientConfig = clientConfig;
+               this.builder = builder;
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               this.client = new 
SiteToSiteClient.Builder().fromConfig(clientConfig).build();
+       }
+
+       @Override
+       public void invoke(T value) throws Exception {
+               final NiFiDataPacket niFiDataPacket = 
builder.createNiFiDataPacket(value, getRuntimeContext());
+
+               final Transaction transaction = 
client.createTransaction(TransferDirection.SEND);
+               if (transaction == null) {
+                       throw new IllegalStateException("Unable to create a 
NiFi Transaction to send data");
+               }
+
+               transaction.send(niFiDataPacket.getContent(), 
niFiDataPacket.getAttributes());
+               transaction.confirm();
+               transaction.complete();
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+               client.close();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
 
b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
new file mode 100644
index 0000000..57c59ec
--- /dev/null
+++ 
b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.nifi;
+
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A source that pulls data from Apache NiFi using the NiFi Site-to-Site 
client. This source
+ * produces NiFiDataPackets which encapsulate the content and attributes of a 
NiFi FlowFile.
+ */
+public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> 
implements StoppableFunction{
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(NiFiSource.class);
+
+       private static final long DEFAULT_WAIT_TIME_MS = 1000;
+
+       // 
------------------------------------------------------------------------
+
+       private final SiteToSiteClientConfig clientConfig;
+
+       private final long waitTimeMs;
+
+       private transient SiteToSiteClient client;
+
+       private volatile boolean isRunning = true;
+
+       /**
+        * Constructs a new NiFiSource using the given client config and the 
default wait time of 1000 ms.
+        *
+        * @param clientConfig the configuration for building a NiFi 
SiteToSiteClient
+        */
+       public NiFiSource(SiteToSiteClientConfig clientConfig) {
+               this(clientConfig, DEFAULT_WAIT_TIME_MS);
+       }
+
+       /**
+        * Constructs a new NiFiSource using the given client config and wait 
time.
+        *
+        * @param clientConfig the configuration for building a NiFi 
SiteToSiteClient
+        * @param waitTimeMs the amount of time to wait (in milliseconds) if no 
data is available to pull from NiFi
+        */
+       public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs) 
{
+               this.clientConfig = clientConfig;
+               this.waitTimeMs = waitTimeMs;
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               client = new 
SiteToSiteClient.Builder().fromConfig(clientConfig).build();
+       }
+
+       @Override
+       public void run(SourceContext<NiFiDataPacket> ctx) throws Exception {
+               while (isRunning) {
+                       final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
+                       if (transaction == null) {
+                               LOG.warn("A transaction could not be created, 
waiting and will try again...");
+                               try {
+                                       Thread.sleep(waitTimeMs);
+                               } catch (InterruptedException ignored) {
+
+                               }
+                               continue;
+                       }
+
+                       DataPacket dataPacket = transaction.receive();
+                       if (dataPacket == null) {
+                               transaction.confirm();
+                               transaction.complete();
+
+                               LOG.debug("No data available to pull, waiting 
and will try again...");
+                               try {
+                                       Thread.sleep(waitTimeMs);
+                               } catch (InterruptedException ignored) {
+
+                               }
+                               continue;
+                       }
+
+                       final List<NiFiDataPacket> niFiDataPackets = new 
ArrayList<>();
+                       do {
+                               // Read the data into a byte array and wrap it 
along with the attributes
+                               // into a NiFiDataPacket.
+                               final InputStream inStream = 
dataPacket.getData();
+                               final byte[] data = new byte[(int) 
dataPacket.getSize()];
+                               StreamUtils.fillBuffer(inStream, data);
+
+                               final Map<String, String> attributes = 
dataPacket.getAttributes();
+
+                               niFiDataPackets.add(new 
StandardNiFiDataPacket(data, attributes));
+                               dataPacket = transaction.receive();
+                       } while (dataPacket != null);
+
+                       // Confirm transaction to verify the data
+                       transaction.confirm();
+
+                       for (NiFiDataPacket dp : niFiDataPackets) {
+                               ctx.collect(dp);
+                       }
+
+                       transaction.complete();
+               }
+       }
+
+       @Override
+       public void cancel() {
+               isRunning = false;
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+               client.close();
+       }
+
+       @Override
+       public void stop() {
+               this.isRunning = false;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
 
b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
new file mode 100644
index 0000000..5ad4bae
--- /dev/null
+++ 
b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * An implementation of NiFiDataPacket.
+ */
+public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
+       private static final long serialVersionUID = 6364005260220243322L;
+
+       private final byte[] content;
+       private final Map<String, String> attributes;
+
+       public StandardNiFiDataPacket(final byte[] content, final Map<String, 
String> attributes) {
+               this.content = content;
+               this.attributes = attributes;
+       }
+
+       @Override
+       public byte[] getContent() {
+               return content;
+       }
+
+       @Override
+       public Map<String, String> getAttributes() {
+               return attributes;
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
 
b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
new file mode 100644
index 0000000..572f949
--- /dev/null
+++ 
b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
@@ -0,0 +1,55 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.connectors.nifi.examples;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
+import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
+import org.apache.flink.streaming.connectors.nifi.NiFiSink;
+import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+import java.util.HashMap;
+
+/**
+ * An example topology that sends data to a NiFi input port named "Data from 
Flink".
+ */
+public class NiFiSinkTopologyExample {
+
+       public static void main(String[] args) throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               SiteToSiteClientConfig clientConfig = new 
SiteToSiteClient.Builder()
+                               .url("http://localhost:8080/nifi";)
+                               .portName("Data from Flink")
+                               .buildConfig();
+
+               DataStreamSink<String> dataStream = env.fromElements("one", 
"two", "three", "four", "five", "q")
+                               .addSink(new NiFiSink<>(clientConfig, new 
NiFiDataPacketBuilder<String>() {
+                                       @Override
+                                       public NiFiDataPacket 
createNiFiDataPacket(String s, RuntimeContext ctx) {
+                                               return new 
StandardNiFiDataPacket(s.getBytes(), new HashMap<String,String>());
+                                       }
+                               }));
+
+               env.execute();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
 
b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
new file mode 100644
index 0000000..79c9a1c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
@@ -0,0 +1,58 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.connectors.nifi.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
+import org.apache.flink.streaming.connectors.nifi.NiFiSource;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+import java.nio.charset.Charset;
+
+/**
+ * An example topology that pulls data from a NiFi output port named "Data for 
Flink".
+ */
+public class NiFiSourceTopologyExample {
+
+       public static void main(String[] args) throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               SiteToSiteClientConfig clientConfig = new 
SiteToSiteClient.Builder()
+                               .url("http://localhost:8080/nifi";)
+                               .portName("Data for Flink")
+                               .requestBatchCount(5)
+                               .buildConfig();
+
+               SourceFunction<NiFiDataPacket> nifiSource = new 
NiFiSource(clientConfig);
+               DataStream<NiFiDataPacket> streamSource = 
env.addSource(nifiSource).setParallelism(2);
+
+               DataStream<String> dataStream = streamSource.map(new 
MapFunction<NiFiDataPacket, String>() {
+                       @Override
+                       public String map(NiFiDataPacket value) throws 
Exception {
+                               return new String(value.getContent(), 
Charset.defaultCharset());
+                       }
+               });
+
+               dataStream.print();
+               env.execute();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml 
b/flink-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
new file mode 100644
index 0000000..d373d63
--- /dev/null
+++ b/flink-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
@@ -0,0 +1,16 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<template><description></description><name>NiFi_Flink</name><snippet><connections><id>34acfdda-dd21-48c0-8779-95d0e258f5cb</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0
 
MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><type>PROCESSOR</type></destination><flowFileExpiration>0
 
sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>769242e5-ee04-4656-a684-ca661a18eed6</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>59574e3b-1ba7-4343-b265-af1b67923a85</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0
 MB</backPressureDataSizeThreshold><backPressureObjectThresh
 
old>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>48042218-a51e-45c7-bd30-2290bba8b191</id><type>OUTPUT_PORT</type></destination><flowFileExpiration>0
 
sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>46c9343f-f732-4e2d-98e1-13caab5d2f5e</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0
 
MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>e3a9026f-dae1-42ca-851c-02d9fda22094</id><type>PROCESSOR</type></destination><flowFileExpiration>0
 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><source><groupI
 
d>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>cd8c5227-cfa9-4603-9472-b2234d7bd741</id><type>INPUT_PORT</type></source><zIndex>0</zIndex></connections><inputPorts><id>cd8c5227-cfa9-4603-9472-b2234d7bd741</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>395.0</x><y>520.0</y></position><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><name>Data
 from 
Flink</name><state>RUNNING</state><transmitting>false</transmitting><type>INPUT_PORT</type></inputPorts><outputPorts><id>48042218-a51e-45c7-bd30-2290bba8b191</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>1616.0</x><y>259.0</y></position><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><name>Data
 for 
Flink</name><state>RUNNING</state><transmitting>false</transmitting><type>OUTPUT_PORT</type></outputPorts><processors><id>769242e5-ee04-4656-a684-ca661a18eed6</id><parentGroupId>0f854f2b-239f-45f0-bfed-48
 
b5b23f7928</parentGroupId><position><x>389.0</x><y>231.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0
 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * 
?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>File 
Size</key><value><description>The size of the file that will be 
used</description><displayName>File 
Size</displayName><dynamic>false</dynamic><name>File 
Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Batch
 Size</key><value><defaultValue>1</defaultValue><description>The number of 
FlowFiles to be transferr
 ed in each invocation</description><displayName>Batch 
Size</displayName><dynamic>false</dynamic><name>Batch 
Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Data
 
Format</key><value><allowableValues><displayName>Binary</displayName><value>Binary</value></allowableValues><allowableValues><displayName>Text</displayName><value>Text</value></allowableValues><defaultValue>Binary</defaultValue><description>Specifies
 whether the data should be Text or Binary</description><displayName>Data 
Format</displayName><dynamic>false</dynamic><name>Data 
Format</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Unique
 
FlowFiles</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>false</defaultValue><description>If
 true, ea
 ch FlowFile that is generated will be unique. If false, a random value will be 
generated and all FlowFiles will get the same content but this offers much 
higher throughput</description><displayName>Unique 
FlowFiles</displayName><dynamic>false</dynamic><name>Unique 
FlowFiles</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30
 sec</penaltyDuration><properties><entry><key>File Size</key><value>1 
b</value></entry><entry><key>Batch 
Size</key><value>1</value></entry><entry><key>Data 
Format</key><value>Binary</value></entry><entry><key>Unique 
FlowFiles</key><value>false</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>2
 
sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1
 
sec</yieldDuration></config><name>GenerateFlowFile</name><relationships><autoTerminate>false</autoTerminate><description></des
 
cription><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>false</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.GenerateFlowFile</type></processors><processors><id>e3a9026f-dae1-42ca-851c-02d9fda22094</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>826.0</x><y>499.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0
 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * 
?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>Log 
Level</key><value><al
 
lowableValues><displayName>trace</displayName><value>trace</value></allowableValues><allowableValues><displayName>debug</displayName><value>debug</value></allowableValues><allowableValues><displayName>info</displayName><value>info</value></allowableValues><allowableValues><displayName>warn</displayName><value>warn</value></allowableValues><allowableValues><displayName>error</displayName><value>error</value></allowableValues><defaultValue>info</defaultValue><description>The
 Log Level to use when logging the Attributes</description><displayName>Log 
Level</displayName><dynamic>false</dynamic><name>Log 
Level</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Log
 
Payload</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>false</defaultValue><description>If
 true, the FlowFile's p
 ayload will be logged, in addition to its attributes; otherwise, just the 
Attributes will be logged.</description><displayName>Log 
Payload</displayName><dynamic>false</dynamic><name>Log 
Payload</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Attributes
 to Log</key><value><description>A comma-separated list of Attributes to Log. 
If not specified, all attributes will be 
logged.</description><displayName>Attributes to 
Log</displayName><dynamic>false</dynamic><name>Attributes to 
Log</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Attributes
 to Ignore</key><value><description>A comma-separated list of Attributes to 
ignore. If not specified, no attributes will be 
ignored.</description><displayName>Attributes to 
Ignore</displayName><dynamic>false</dynamic><name>Attributes to 
Ignore</name><required>false</required><sensitive>false</sensitive><supportsEl>
 false</supportsEl></value></entry><entry><key>Log 
prefix</key><value><description>Log prefix appended to the log lines. It helps 
to distinguish the output of multiple LogAttribute 
processors.</description><displayName>Log 
prefix</displayName><dynamic>false</dynamic><name>Log 
prefix</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30
 sec</penaltyDuration><properties><entry><key>Log 
Level</key></entry><entry><key>Log 
Payload</key><value>true</value></entry><entry><key>Attributes to 
Log</key></entry><entry><key>Attributes to Ignore</key></entry><entry><key>Log 
prefix</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0
 
sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1
 
sec</yieldDuration></config><name>LogAttribute</name><relationships><autoTerminate>true</autoTerminate><description>All
 FlowFil
 es are routed to this 
relationship</description><name>success</name></relationships><state>RUNNING</state><style/><supportsEventDriven>true</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.LogAttribute</type></processors><processors><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>1000.0</x><y>231.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0
 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * 
?</value></entry></defaultSchedulingPeriod><descriptors><e
 ntry><key>Regular 
Expression</key><value><defaultValue>(?s:^.*$)</defaultValue><description>The 
Regular Expression to search for in the FlowFile 
content</description><displayName>Regular 
Expression</displayName><dynamic>false</dynamic><name>Regular 
Expression</name><required>true</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Replacement
 Value</key><value><defaultValue>$1</defaultValue><description>The value to 
replace the regular expression with. Back-references to Regular Expression 
capturing groups are supported, but back-references that reference capturing 
groups that do not exist in the regular expression will be treated as literal 
value.</description><displayName>Replacement 
Value</displayName><dynamic>false</dynamic><name>Replacement 
Value</name><required>true</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Character
 Set</key><value><defaultValue>UTF-8</defaultValue><description>The
  Character Set in which the file is 
encoded</description><displayName>Character 
Set</displayName><dynamic>false</dynamic><name>Character 
Set</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Maximum
 Buffer Size</key><value><defaultValue>1 
MB</defaultValue><description>Specifies the maximum amount of data to buffer 
(per file or per line, depending on the Evaluation Mode) in order to apply the 
regular expressions. If 'Entire Text' (in Evaluation Mode) is selected and the 
FlowFile is larger than this value, the FlowFile will be routed to 'failure'. 
In 'Line-by-Line' Mode, if a single line is larger than this value, the 
FlowFile will be routed to 'failure'. A default value of 1 MB is provided, 
primarily for 'Entire Text' mode. In 'Line-by-Line' Mode, a value such as 8 KB 
or 16 KB is suggested. This value is ignored and the buffer is not used if 
'Regular Expression' is set to '.*'</description><displayName>Maximum Buffer 
 Size</displayName><dynamic>false</dynamic><name>Maximum Buffer 
Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Evaluation
 
Mode</key><value><allowableValues><displayName>Line-by-Line</displayName><value>Line-by-Line</value></allowableValues><allowableValues><displayName>Entire
 text</displayName><value>Entire 
text</value></allowableValues><defaultValue>Entire 
text</defaultValue><description>Evaluate the 'Regular Expression' against each 
line (Line-by-Line) or buffer the entire file into memory (Entire Text) and 
then evaluate the 'Regular Expression'.</description><displayName>Evaluation 
Mode</displayName><dynamic>false</dynamic><name>Evaluation 
Mode</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30
 sec</penaltyDuration><properties><entry><key>Regular 
Expression</key><value>(?s:^.*$)</value><
 /entry><entry><key>Replacement Value</key><value>blah 
blah</value></entry><entry><key>Character 
Set</key><value>UTF-8</value></entry><entry><key>Maximum Buffer 
Size</key><value>1 MB</value></entry><entry><key>Evaluation 
Mode</key><value>Entire 
text</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0
 
sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1
 
sec</yieldDuration></config><name>ReplaceText</name><relationships><autoTerminate>true</autoTerminate><description>FlowFiles
 that could not be updated are routed to this 
relationship</description><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><description>FlowFiles
 that have been successfully updated are routed to this relationship, as well 
as FlowFiles whose content does not match the given Regular 
Expression</description><name>success</name></relationships><state>RUNNING</state><style/><supportsEventDriven>true</supports
 
EventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.ReplaceText</type></processors></snippet><timestamp>09/30/2015
 09:10:38 EDT</timestamp></template>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/pom.xml 
b/flink-connectors/flink-connector-rabbitmq/pom.xml
new file mode 100644
index 0000000..0b69d66
--- /dev/null
+++ b/flink-connectors/flink-connector-rabbitmq/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.2-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-rabbitmq_2.10</artifactId>
+       <name>flink-connector-rabbitmq</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <rabbitmq.version>3.3.1</rabbitmq.version>
+       </properties>
+
+       <dependencies>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.rabbitmq</groupId>
+                       <artifactId>amqp-client</artifactId>
+                       <version>${rabbitmq.version}</version>
+               </dependency>
+
+       </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 
b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
new file mode 100644
index 0000000..a0795d6
--- /dev/null
+++ 
b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import java.io.IOException;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+/**
+ * A Sink for publishing data into RabbitMQ
+ * @param <IN>
+ */
+public class RMQSink<IN> extends RichSinkFunction<IN> {
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
+
+       protected final String queueName;
+       private final RMQConnectionConfig rmqConnectionConfig;
+       protected transient Connection connection;
+       protected transient Channel channel;
+       protected SerializationSchema<IN> schema;
+       private boolean logFailuresOnly = false;
+
+       /**
+        * @param rmqConnectionConfig The RabbitMQ connection configuration 
{@link RMQConnectionConfig}.
+        * @param queueName The queue to publish messages to.
+        * @param schema A {@link SerializationSchema} for turning the Java 
objects received into bytes
+     */
+       public RMQSink(RMQConnectionConfig rmqConnectionConfig, String 
queueName, SerializationSchema<IN> schema) {
+               this.rmqConnectionConfig = rmqConnectionConfig;
+               this.queueName = queueName;
+               this.schema = schema;
+       }
+
+       /**
+        * Sets up the queue. The default implementation just declares the 
queue. The user may override
+        * this method to have a custom setup for the queue (i.e. binding the 
queue to an exchange or
+        * defining custom queue parameters)
+        */
+       protected void setupQueue() throws IOException {
+               channel.queueDeclare(queueName, false, false, false, null);
+       }
+
+       /**
+        * Defines whether the producer should fail on errors, or only log them.
+        * If this is set to true, then exceptions will be only logged, if set 
to false,
+        * exceptions will be eventually thrown and cause the streaming program 
to
+        * fail (and enter recovery).
+        *
+        * @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+        */
+       public void setLogFailuresOnly(boolean logFailuresOnly) {
+               this.logFailuresOnly = logFailuresOnly;
+       }
+
+
+       @Override
+       public void open(Configuration config) throws Exception {
+               ConnectionFactory factory = 
rmqConnectionConfig.getConnectionFactory();
+               try {
+                       connection = factory.newConnection();
+                       channel = connection.createChannel();
+                       if (channel == null) {
+                               throw new RuntimeException("None of RabbitMQ 
channels are available");
+                       }
+                       setupQueue();
+               } catch (IOException e) {
+                       throw new RuntimeException("Error while creating the 
channel", e);
+               }
+       }
+
+       /**
+        * Called when new data arrives to the sink, and forwards it to RMQ.
+        *
+        * @param value
+        *            The incoming data
+        */
+       @Override
+       public void invoke(IN value) {
+               try {
+                       byte[] msg = schema.serialize(value);
+
+                       channel.basicPublish("", queueName, null, msg);
+               } catch (IOException e) {
+                       if (logFailuresOnly) {
+                               LOG.error("Cannot send RMQ message {} at {}", 
queueName, rmqConnectionConfig.getHost(), e);
+                       } else {
+                               throw new RuntimeException("Cannot send RMQ 
message " + queueName +" at " + rmqConnectionConfig.getHost(), e);
+                       }
+               }
+
+       }
+
+       @Override
+       public void close() {
+               IOException t = null;
+               try {
+                       channel.close();
+               } catch (IOException e) {
+                       t = e;
+               }
+
+               try {
+                       connection.close();
+               } catch (IOException e) {
+                       if(t != null) {
+                               LOG.warn("Both channel and connection closing 
failed. Logging channel exception and failing with connection exception", t);
+                       }
+                       t = e;
+               }
+               if(t != null) {
+                       throw new RuntimeException("Error while closing RMQ 
connection with " + queueName
+                                       + " at " + 
rmqConnectionConfig.getHost(), t);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
 
b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
new file mode 100644
index 0000000..ee9c3b9
--- /dev/null
+++ 
b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
+import 
org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.util.Preconditions;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.QueueingConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RabbitMQ source (consumer) which reads from a queue and acknowledges 
messages on checkpoints.
+ * When checkpointing is enabled, it guarantees exactly-once processing 
semantics.
+ *
+ * RabbitMQ requires messages to be acknowledged. On failures, RabbitMQ will 
re-resend all messages
+ * which have not been acknowledged previously. When a failure occurs directly 
after a completed
+ * checkpoint, all messages part of this checkpoint might be processed again 
because they couldn't
+ * be acknowledged before failure. This case is handled by the {@link 
MessageAcknowledgingSourceBase}
+ * base class which deduplicates the messages using the correlation id.
+ *
+ * RabbitMQ's Delivery Tags do NOT represent unique ids / offsets. That's why 
the source uses the
+ * Correlation ID in the message properties to check for duplicate messages. 
Note that the
+ * correlation id has to be set at the producer. If the correlation id is not 
set, messages may
+ * be produced more than once in corner cases.
+ *
+ * This source can be operated in three different modes:
+ *
+ * 1) Exactly-once (when checkpointed) with RabbitMQ transactions and messages 
with
+ *    unique correlation IDs.
+ * 2) At-least-once (when checkpointed) with RabbitMQ transactions but no 
deduplication mechanism
+ *    (correlation id is not set).
+ * 3) No strong delivery guarantees (without checkpointing) with RabbitMQ 
auto-commit mode.
+ *
+ * Users may overwrite the setupConnectionFactory() method to pass their setup 
their own
+ * ConnectionFactory in case the constructor parameters are not sufficient.
+ *
+ * @param <OUT> The type of the data read from RabbitMQ.
+ */
+public class RMQSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long>
+       implements ResultTypeQueryable<OUT> {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(RMQSource.class);
+
+       private final RMQConnectionConfig rmqConnectionConfig;
+       protected final String queueName;
+       private final boolean usesCorrelationId;
+       protected DeserializationSchema<OUT> schema;
+
+       protected transient Connection connection;
+       protected transient Channel channel;
+       protected transient QueueingConsumer consumer;
+
+       protected transient boolean autoAck;
+
+       private transient volatile boolean running;
+
+       /**
+        * Creates a new RabbitMQ source with at-least-once message processing 
guarantee when
+        * checkpointing is enabled. No strong delivery guarantees when 
checkpointing is disabled.
+        * For exactly-once, please use the constructor
+        * {@link RMQSource#RMQSource(RMQConnectionConfig, String, boolean 
usesCorrelationId, DeserializationSchema)},
+        * set {@param usesCorrelationId} to true and enable checkpointing.
+        * @param rmqConnectionConfig The RabbiMQ connection configuration 
{@link RMQConnectionConfig}.
+        * @param queueName  The queue to receive messages from.
+        * @param deserializationSchema A {@link DeserializationSchema} for 
turning the bytes received
+        *                                              into Java objects.
+        */
+       public RMQSource(RMQConnectionConfig rmqConnectionConfig, String 
queueName,
+                                       DeserializationSchema<OUT> 
deserializationSchema) {
+               this(rmqConnectionConfig, queueName, false, 
deserializationSchema);
+       }
+
+       /**
+        * Creates a new RabbitMQ source. For exactly-once, you must set the 
correlation ids of messages
+        * at the producer. The correlation id must be unique. Otherwise the 
behavior of the source is
+        * undefined. In doubt, set {@param usesCorrelationId} to false. When 
correlation ids are not
+        * used, this source has at-least-once processing semantics when 
checkpointing is enabled.
+        * @param rmqConnectionConfig The RabbiMQ connection configuration 
{@link RMQConnectionConfig}.
+        * @param queueName The queue to receive messages from.
+        * @param usesCorrelationId Whether the messages received are supplied 
with a <b>unique</b>
+        *                          id to deduplicate messages (in case of 
failed acknowledgments).
+        *                          Only used when checkpointing is enabled.
+        * @param deserializationSchema A {@link DeserializationSchema} for 
turning the bytes received
+        *                              into Java objects.
+        */
+       public RMQSource(RMQConnectionConfig rmqConnectionConfig,
+                                       String queueName, boolean 
usesCorrelationId,DeserializationSchema<OUT> deserializationSchema) {
+               super(String.class);
+               this.rmqConnectionConfig = rmqConnectionConfig;
+               this.queueName = queueName;
+               this.usesCorrelationId = usesCorrelationId;
+               this.schema = deserializationSchema;
+       }
+
+       /**
+        * Initializes the connection to RMQ with a default connection factory. 
The user may override
+        * this method to setup and configure their own ConnectionFactory.
+        */
+       protected ConnectionFactory setupConnectionFactory() throws Exception {
+               return rmqConnectionConfig.getConnectionFactory();
+       }
+
+       /**
+        * Sets up the queue. The default implementation just declares the 
queue. The user may override
+        * this method to have a custom setup for the queue (i.e. binding the 
queue to an exchange or
+        * defining custom queue parameters)
+        */
+       protected void setupQueue() throws IOException {
+               channel.queueDeclare(queueName, true, false, false, null);
+       }
+
+       @Override
+       public void open(Configuration config) throws Exception {
+               super.open(config);
+               ConnectionFactory factory = setupConnectionFactory();
+               try {
+                       connection = factory.newConnection();
+                       channel = connection.createChannel();
+                       if (channel == null) {
+                               throw new RuntimeException("None of RabbitMQ 
channels are available");
+                       }
+                       setupQueue();
+                       consumer = new QueueingConsumer(channel);
+
+                       RuntimeContext runtimeContext = getRuntimeContext();
+                       if (runtimeContext instanceof StreamingRuntimeContext
+                                       && ((StreamingRuntimeContext) 
runtimeContext).isCheckpointingEnabled()) {
+                               autoAck = false;
+                               // enables transaction mode
+                               channel.txSelect();
+                       } else {
+                               autoAck = true;
+                       }
+
+                       LOG.debug("Starting RabbitMQ source with autoAck 
status: " + autoAck);
+                       channel.basicConsume(queueName, autoAck, consumer);
+
+               } catch (IOException e) {
+                       throw new RuntimeException("Cannot create RMQ 
connection with " + queueName + " at "
+                                       + rmqConnectionConfig.getHost(), e);
+               }
+               running = true;
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+               try {
+                       connection.close();
+               } catch (IOException e) {
+                       throw new RuntimeException("Error while closing RMQ 
connection with " + queueName
+                               + " at " + rmqConnectionConfig.getHost(), e);
+               }
+       }
+
+
+       @Override
+       public void run(SourceContext<OUT> ctx) throws Exception {
+               while (running) {
+                       QueueingConsumer.Delivery delivery = 
consumer.nextDelivery();
+
+                       synchronized (ctx.getCheckpointLock()) {
+
+                               OUT result = 
schema.deserialize(delivery.getBody());
+
+                               if (schema.isEndOfStream(result)) {
+                                       break;
+                               }
+
+                               if (!autoAck) {
+                                       final long deliveryTag = 
delivery.getEnvelope().getDeliveryTag();
+                                       if (usesCorrelationId) {
+                                               final String correlationId = 
delivery.getProperties().getCorrelationId();
+                                               
Preconditions.checkNotNull(correlationId, "RabbitMQ source was instantiated " +
+                                                       "with usesCorrelationId 
set to true but a message was received with " +
+                                                       "correlation id set to 
null!");
+                                               if (!addId(correlationId)) {
+                                                       // we have already 
processed this message
+                                                       continue;
+                                               }
+                                       }
+                                       sessionIds.add(deliveryTag);
+                               }
+
+                               ctx.collect(result);
+                       }
+               }
+       }
+
+       @Override
+       public void cancel() {
+               running = false;
+       }
+
+       @Override
+       protected void acknowledgeSessionIDs(List<Long> sessionIds) {
+               try {
+                       for (long id : sessionIds) {
+                               channel.basicAck(id, false);
+                       }
+                       channel.txCommit();
+               } catch (IOException e) {
+                       throw new RuntimeException("Messages could not be 
acknowledged during checkpoint creation.", e);
+               }
+       }
+
+       @Override
+       public TypeInformation<OUT> getProducedType() {
+               return schema.getProducedType();
+       }
+}

Reply via email to