MINIFI-6: Add Site2Site
Adjust README

Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/e170f7aa
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/e170f7aa
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/e170f7aa

Branch: refs/heads/master
Commit: e170f7aa47919ac8deb3a9df5b6e98f0e15754cc
Parents: 7956696
Author: Bin Qiu <[email protected]>
Authored: Thu Jul 14 09:13:34 2016 -0700
Committer: Aldrin Piri <[email protected]>
Committed: Tue Aug 2 11:37:42 2016 -0400

----------------------------------------------------------------------
 Makefile                             |    4 +-
 README.md                            |   56 +-
 conf/flow.xml                        |  215 ++++-
 conf/flowServer.xml                  |  130 +++
 conf/flow_Site2SiteServer.xml        |  140 +++
 conf/nifi.properties                 |    6 +
 inc/Exception.h                      |    2 +
 inc/FlowControlProtocol.h            |    5 +
 inc/FlowController.h                 |   11 +
 inc/ProcessGroup.h                   |   42 +
 inc/RealTimeDataCollector.h          |  131 +++
 inc/RemoteProcessorGroupPort.h       |   96 +++
 inc/Site2SiteClientProtocol.h        |  633 ++++++++++++++
 inc/Site2SitePeer.h                  |  359 ++++++++
 inc/TimeUtil.h                       |    2 +-
 main/MiNiFiMain.cpp                  |    2 +-
 src/FlowControlProtocol.cpp          |    2 +
 src/FlowController.cpp               |  223 ++++-
 src/GenerateFlowFile.cpp             |    1 +
 src/LogAttribute.cpp                 |    5 +-
 src/ProcessGroup.cpp                 |    3 +
 src/ProcessSession.cpp               |   22 +-
 src/RealTimeDataCollector.cpp        |  482 +++++++++++
 src/RemoteProcessorGroupPort.cpp     |   99 +++
 src/Site2SiteClientProtocol.cpp      | 1313 +++++++++++++++++++++++++++++
 src/Site2SitePeer.cpp                |  434 ++++++++++
 target/conf/flow.xml                 |  131 ++-
 target/conf/flowServer.xml           |  130 +++
 target/conf/flow_Site2SiteServer.xml |  140 +++
 target/conf/nifi.properties          |    2 +-
 thirdparty/uuid/tst_uuid             |  Bin 191032 -> 29660 bytes
 31 files changed, 4753 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/Makefile
----------------------------------------------------------------------
diff --git a/Makefile b/Makefile
index 3128422..963f473 100644
--- a/Makefile
+++ b/Makefile
@@ -28,7 +28,7 @@ LDDIRECTORY=-L./build -L./thirdparty/uuid 
-L./thirdparty/libxml2/.libs/ #-L/usr/
 #LDFLAGS=-lminifi -lxml2 -lleveldb -pthread -luuid
 LDFLAGS=-static -lminifi -lxml2 -pthread -luuid
 else ifeq ($(ARCH), linux)
-CFLAGS=-O0 -fexceptions -fpermissive -Wno-write-strings -std=c++11 -fPIC -Wall 
-g -Wno-unused-private-field
+CFLAGS=-O0 -fexceptions -fpermissive -Wno-write-strings -std=c++11 -fPIC -Wall 
-g
 INCLUDES=-I./inc -I./src -I./thirdparty -I./test 
-I./thirdparty/libxml2/include #-I/usr/local/opt/leveldb/include/
 LDDIRECTORY=-L./build -L./thirdparty/uuid -L./thirdparty/libxml2/.libs/ 
#-L/usr/local/opt/leveldb/lib
 #LDFLAGS=-lminifi -lxml2 -lleveldb -pthread -luuid
@@ -37,7 +37,7 @@ else
 CFLAGS=-O0 -fexceptions -fpermissive -Wno-write-strings -std=c++11 -fPIC -Wall 
-g -Wno-unused-private-field
 INCLUDES=-I./inc -I./src -I./test -I/usr/include/libxml2 
#-I/usr/local/opt/leveldb/include/
 LDDIRECTORY=-L./build #-L/usr/local/opt/leveldb/out-static/
-LDFLAGS=-lminifi -lxml2 -pthread -luuid#--llevedb
+LDFLAGS=-lminifi -lxml2 -pthread -luuid #--llevedb
 endif
 
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index d575cb1..6c4137e 100644
--- a/README.md
+++ b/README.md
@@ -37,8 +37,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 ## Dependencies
-   * [LevelDB](https://github.com/google/leveldb) - tested with v1.18
-     MAC: brew install leveldb
    * gcc - 4.8.4
    * g++ - 4.8.4
    * [libxml2](http://xmlsoft.org/) - tested with 2.9.1
@@ -48,26 +46,58 @@ limitations under the License.
 
 ## Build instructions
 
-Build application
+Build application, it will build minifi exe under build and copy over to 
target directory
  
-   $ make clean
    $ make
 
-Build tests
-   
-   $ make tests
-
 Clean 
    
    $ make clean
 
-
 ## Running 
 
 Running application
-The nifi flow.xml and nifi.properties are in target/conf
-   $ ./target/minifi
 
-Runnning tests 
+   $ ./target/minifi
 
-   $ ./build/FlowFileRecordTest 
+The Native MiNiFi example flow.xml is in target/conf
+It show cases a Native MiNiFi client which can generate flowfile, log flowfile 
and push it to the NiFi server.
+Also it can pull flowfile from NiFi server and log the flowfile.
+The NiFi server config is target/conf/flow_Site2SiteServer.xml
+
+For trial command control protocol between Native MiNiFi and NiFi Server, 
please see the example NiFi Server implementation in test/Server.cpp
+The command control protocol is not finalized yet. 
+
+Caveat:
+1) 
+Add new propery HostName and Port into RemoteProcessGroup InputOutput port for 
remote Site2Site hostname and port
+<remoteProcessGroup>
+      <id>8f3b248f-d493-4269-b317-36f85719f480</id>
+      <name>NiFi Flow</name>
+      <url>http://localhost:8081/nifi</url>
+      <timeout>30 sec</timeout>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <transmitting>true</transmitting>
+      <inputPort>
+        <id>471deef6-2a6e-4a7d-912a-81cc17e3a204</id>
+        <name> From Node A</name>
+        <position x="0.0" y="0.0"/>
+        <comments/>
+        <scheduledState>RUNNING</scheduledState>
+        <maxConcurrentTasks>1</maxConcurrentTasks>
+        <useCompression>false</useCompression>
+        <property>
+            <name>Host Name</name>
+                <value>localhost</value>
+        </property>
+        <property>
+            <name>Port</name>
+            <value>10001</value>
+        </property>
+      </inputPort>
+2)
+Add new proerties into nifi.properties for command control 
+# MiNiFi Server for Command Control
+nifi.server.name=localhost
+nifi.server.port=9000
+nifi.server.report.interval=1000 ms

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/conf/flow.xml
----------------------------------------------------------------------
diff --git a/conf/flow.xml b/conf/flow.xml
index a29ea1f..51b74e8 100644
--- a/conf/flow.xml
+++ b/conf/flow.xml
@@ -8,14 +8,14 @@
     <position x="0.0" y="0.0"/>
     <comment/>
     <processor>
-      <id>572aa3f3-6288-4ca1-ae43-5e492cb0ea23</id>
-      <name>RealTimeDataCollector</name>
-      <position x="3259.732177734375" y="1739.991943359375"/>
+      <id>e01275ae-ac38-48f9-ac53-1a44df1be88e</id>
+      <name>LogAttribute</name>
+      <position x="3950.0958625440016" y="1355.8949219185629"/>
       <styles/>
       <comment/>
-      <class>org.apache.nifi.processors.standard.RealTimeDataCollector</class>
-      <maxConcurrentTasks>2</maxConcurrentTasks>
-      <schedulingPeriod>1 ms</schedulingPeriod>
+      <class>org.apache.nifi.processors.standard.LogAttribute</class>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+      <schedulingPeriod>0 sec</schedulingPeriod>
       <penalizationPeriod>30 sec</penalizationPeriod>
       <yieldPeriod>1 sec</yieldPeriod>
       <bulletinLevel>WARN</bulletinLevel>
@@ -24,51 +24,206 @@
       <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
       <runDurationNanos>0</runDurationNanos>
       <property>
-        <name>File Name</name>
-        <value>data.osp></value>
+        <name>Log Level</name>
+        <value>info</value>
       </property>
       <property>
-        <name>Real Time Server Name</name>
-        <value>localhost</value>
+        <name>Log Payload</name>
+        <value>false</value>
       </property>
       <property>
-        <name>Real Time Server Port</name>
-        <value>10000</value>
+        <name>Attributes to Log</name>
       </property>
       <property>
-        <name>Batch Server Name</name>
-        <value>localhost</value>
+        <name>Attributes to Ignore</name>
       </property>
       <property>
-        <name>Batch Server Port</name>
-        <value>10001</value>
+        <name>Log prefix</name>
       </property>
+      <autoTerminatedRelationship>success</autoTerminatedRelationship>
+    </processor>
+    <processor>
+      <id>572aa3f3-6288-4ca1-ae43-5e492cb0ea23</id>
+      <name>LogAttribute</name>
+      <position x="3259.732177734375" y="1739.991943359375"/>
+      <styles/>
+      <comment/>
+      <class>org.apache.nifi.processors.standard.LogAttribute</class>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+      <schedulingPeriod>0 sec</schedulingPeriod>
+      <penalizationPeriod>30 sec</penalizationPeriod>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <bulletinLevel>WARN</bulletinLevel>
+      <lossTolerant>false</lossTolerant>
+      <scheduledState>RUNNING</scheduledState>
+      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+      <runDurationNanos>0</runDurationNanos>
       <property>
-        <name>Iteration</name>
-        <value>true</value>
+        <name>Log Level</name>
+        <value>info</value>
       </property>
       <property>
-        <name>Real Time Message ID</name>
-        <value>41</value>
+        <name>Log Payload</name>
+        <value>false</value>
       </property>
       <property>
-        <name>Batch Message ID</name>
-        <value>172, 30, 48</value>
+        <name>Attributes to Log</name>
       </property>
       <property>
-        <name>Real Time Interval</name>
-        <value>10 ms</value>
+        <name>Attributes to Ignore</name>
       </property>
       <property>
-        <name>Batch Time Interval</name>
-        <value>100 ms</value>
+        <name>Log prefix</name>
       </property>
+      <autoTerminatedRelationship>success</autoTerminatedRelationship>
+    </processor>
+    <processor>
+      <id>a0e57bb2-5b89-438e-8869-0326bbdbbe43</id>
+      <name>GenerateFlowFile</name>
+      <position x="2643.1135987796815" y="1457.4419966791334"/>
+      <styles/>
+      <comment/>
+      <class>org.apache.nifi.processors.standard.GenerateFlowFile</class>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+      <schedulingPeriod>1 s</schedulingPeriod>
+      <penalizationPeriod>30 sec</penalizationPeriod>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <bulletinLevel>WARN</bulletinLevel>
+      <lossTolerant>false</lossTolerant>
+      <scheduledState>RUNNING</scheduledState>
+      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+      <runDurationNanos>0</runDurationNanos>
       <property>
-        <name>Batch Max Buffer Size</name>
-        <value>1048576</value>
+        <name>File Size</name>
+        <value>1024 kB</value>
+      </property>
+      <property>
+        <name>Batch Size</name>
+        <value>1</value>
+      </property>
+      <property>
+        <name>Data Format</name>
+        <value>Text</value>
+      </property>
+      <property>
+        <name>Unique FlowFiles</name>
+        <value>false</value>
       </property>
-      <autoTerminatedRelationship>success</autoTerminatedRelationship>
     </processor>
+    <label>
+      <id>809d63d9-6feb-496a-9dc3-d23c217e52fd</id>
+      <position x="3635.581271381991" y="1309.9918825902428"/>
+      <size height="193.5023651123047" width="641.0671997070312"/>
+      <styles>
+        <style name="background-color">#9a91ff</style>
+        <style name="font-size">16px</style>
+      </styles>
+      <value>Pull From Node B</value>
+    </label>
+    <label>
+      <id>d95ce8d3-c005-4d0b-8fcc-b2f6fae7172f</id>
+      <position x="2601.7320892530847" y="1413.1875613011803"/>
+      <size height="193.5023651123047" width="641.0671997070312"/>
+      <styles>
+        <style name="font-size">16px</style>
+      </styles>
+      <value>Push to Node B</value>
+    </label>
+    <remoteProcessGroup>
+      <id>8f3b248f-d493-4269-b317-36f85719f480</id>
+      <name>NiFi Flow</name>
+      <position x="3254.3356850982673" y="1432.3274284388426"/>
+      <comment/>
+      <url>http://localhost:8081/nifi</url>
+      <timeout>30 sec</timeout>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <transmitting>true</transmitting>
+      <inputPort>
+        <id>471deef6-2a6e-4a7d-912a-81cc17e3a204</id>
+        <name> From Node A</name>
+        <position x="0.0" y="0.0"/>
+        <comments/>
+        <scheduledState>RUNNING</scheduledState>
+        <maxConcurrentTasks>1</maxConcurrentTasks>
+        <useCompression>false</useCompression>
+        <property>
+            <name>Host Name</name>
+               <value>localhost</value>
+        </property>
+        <property>
+            <name>Port</name>
+            <value>10001</value>
+        </property>
+      </inputPort>
+      <outputPort>
+        <id>75f88005-0a87-4fef-8320-6219cdbcf18b</id>
+        <name>To A</name>
+        <position x="0.0" y="0.0"/>
+        <comments/>
+        <scheduledState>RUNNING</scheduledState>
+        <maxConcurrentTasks>1</maxConcurrentTasks>
+        <useCompression>false</useCompression>
+        <property>
+            <name>Host Name</name>
+            <value>localhost</value>
+        </property>
+        <property>
+            <name>Port</name>
+            <value>10001</value>
+        </property>
+      </outputPort>
+    </remoteProcessGroup>
+    <connection>
+      <id>c4cf70d8-be05-4c3d-b926-465f330d6503</id>
+      <name/>
+      <bendPoints/>
+      <labelIndex>1</labelIndex>
+      <zIndex>0</zIndex>
+      <sourceId>a0e57bb2-5b89-438e-8869-0326bbdbbe43</sourceId>
+      <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId>
+      <sourceType>PROCESSOR</sourceType>
+      <destinationId>572aa3f3-6288-4ca1-ae43-5e492cb0ea23</destinationId>
+      
<destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId>
+      <destinationType>PROCESSOR</destinationType>
+      <relationship>success</relationship>
+      <maxWorkQueueSize>0</maxWorkQueueSize>
+      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
+      <flowFileExpiration>0 sec</flowFileExpiration>
+    </connection>
+    <connection>
+      <id>c9573abe-937c-464b-b18d-48b29c42dce2</id>
+      <name>site2siteSEND</name>
+      <bendPoints/>
+      <labelIndex>1</labelIndex>
+      <zIndex>0</zIndex>
+      <sourceId>a0e57bb2-5b89-438e-8869-0326bbdbbe43</sourceId>
+      <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId>
+      <sourceType>PROCESSOR</sourceType>
+      <destinationId>471deef6-2a6e-4a7d-912a-81cc17e3a204</destinationId>
+      
<destinationGroupId>8f3b248f-d493-4269-b317-36f85719f480</destinationGroupId>
+      <destinationType>REMOTE_INPUT_PORT</destinationType>
+      <relationship>success</relationship>
+      <maxWorkQueueSize>0</maxWorkQueueSize>
+      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
+      <flowFileExpiration>0 sec</flowFileExpiration>
+    </connection>
+    <connection>
+      <id>2cb90b4c-d6cb-4fef-8f0f-b16459561af5</id>
+      <name>site2siteReceive</name>
+      <bendPoints/>
+      <labelIndex>1</labelIndex>
+      <zIndex>0</zIndex>
+      <sourceId>75f88005-0a87-4fef-8320-6219cdbcf18b</sourceId>
+      <sourceGroupId>8f3b248f-d493-4269-b317-36f85719f480</sourceGroupId>
+      <sourceType>REMOTE_OUTPUT_PORT</sourceType>
+      <destinationId>e01275ae-ac38-48f9-ac53-1a44df1be88e</destinationId>
+      
<destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId>
+      <destinationType>PROCESSOR</destinationType>
+      <relationship/>
+      <maxWorkQueueSize>0</maxWorkQueueSize>
+      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
+      <flowFileExpiration>0 sec</flowFileExpiration>
+    </connection>
   </rootGroup>
   <controllerServices>
     <controllerService>
@@ -114,7 +269,7 @@
       </property>
       <property>
         <name>Truststore Password</name>
-        
<value>enc{9E2EE146023A0F31914706460EB177B357796CF0C768DECE09D10C4B40F344C8}</value>
+        
<value>enc{3A31531B76B6395A72FB8BEB4C93E2040877D07C04FDAB5A84499B918BECEB77}</value>
       </property>
       <property>
         <name>Truststore Type</name>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/conf/flowServer.xml
----------------------------------------------------------------------
diff --git a/conf/flowServer.xml b/conf/flowServer.xml
new file mode 100644
index 0000000..caca3eb
--- /dev/null
+++ b/conf/flowServer.xml
@@ -0,0 +1,130 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<flowController>
+  <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
+  <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
+  <rootGroup>
+    <id>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</id>
+    <name>NiFi Flow</name>
+    <position x="0.0" y="0.0"/>
+    <comment/>
+    <processor>
+      <id>572aa3f3-6288-4ca1-ae43-5e492cb0ea23</id>
+      <name>RealTimeDataCollector</name>
+      <position x="3259.732177734375" y="1739.991943359375"/>
+      <styles/>
+      <comment/>
+      <class>org.apache.nifi.processors.standard.RealTimeDataCollector</class>
+      <maxConcurrentTasks>2</maxConcurrentTasks>
+      <schedulingPeriod>10 ms</schedulingPeriod>
+      <penalizationPeriod>30 sec</penalizationPeriod>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <bulletinLevel>WARN</bulletinLevel>
+      <lossTolerant>false</lossTolerant>
+      <scheduledState>RUNNING</scheduledState>
+      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+      <runDurationNanos>0</runDurationNanos>
+      <property>
+        <name>File Name</name>
+        <value>data.osp</value>
+      </property>
+      <property>
+        <name>Real Time Server Name</name>
+        <value>localhost</value>
+      </property>
+      <property>
+        <name>Real Time Server Port</name>
+        <value>10000</value>
+      </property>
+      <property>
+        <name>Batch Server Name</name>
+        <value>localhost</value>
+      </property>
+      <property>
+        <name>Batch Server Port</name>
+        <value>10001</value>
+      </property>
+      <property>
+        <name>Iteration</name>
+        <value>true</value>
+      </property>
+      <property>
+        <name>Real Time Message ID</name>
+        <value>41</value>
+      </property>
+      <property>
+        <name>Batch Message ID</name>
+        <value>172,48</value>
+      </property>
+      <property>
+        <name>Real Time Interval</name>
+        <value>200 ms</value>
+      </property>
+      <property>
+        <name>Batch Time Interval</name>
+        <value>1 sec</value>
+      </property>
+      <property>
+        <name>Batch Max Buffer Size</name>
+        <value>262144</value>
+      </property>
+      <autoTerminatedRelationship>success</autoTerminatedRelationship>
+    </processor>
+  </rootGroup>
+  <controllerServices>
+    <controllerService>
+      <id>b2785fb0-e797-4c4d-8592-d2b2563504c4</id>
+      <name>DistributedMapCacheClientService</name>
+      <comment/>
+      
<class>org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService</class>
+      <enabled>true</enabled>
+      <property>
+        <name>Server Hostname</name>
+        <value>localhost</value>
+      </property>
+      <property>
+        <name>Server Port</name>
+        <value>4557</value>
+      </property>
+      <property>
+        <name>SSL Context Service</name>
+      </property>
+      <property>
+        <name>Communications Timeout</name>
+        <value>30 secs</value>
+      </property>
+    </controllerService>
+    <controllerService>
+      <id>2855f1e0-dc35-4955-9ae2-b2d7d1765d4e</id>
+      <name>StandardSSLContextService</name>
+      <comment/>
+      <class>org.apache.nifi.ssl.StandardSSLContextService</class>
+      <enabled>true</enabled>
+      <property>
+        <name>Keystore Filename</name>
+      </property>
+      <property>
+        <name>Keystore Password</name>
+      </property>
+      <property>
+        <name>Keystore Type</name>
+      </property>
+      <property>
+        <name>Truststore Filename</name>
+        
<value>/Library/Java/JavaVirtualMachines/jdk1.8.0_73.jdk/Contents/Home/jre/lib/security/cacerts</value>
+      </property>
+      <property>
+        <name>Truststore Password</name>
+        
<value>enc{9E2EE146023A0F31914706460EB177B357796CF0C768DECE09D10C4B40F344C8}</value>
+      </property>
+      <property>
+        <name>Truststore Type</name>
+        <value>JKS</value>
+      </property>
+      <property>
+        <name>SSL Protocol</name>
+        <value>TLS</value>
+      </property>
+    </controllerService>
+  </controllerServices>
+  <reportingTasks/>
+</flowController>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/conf/flow_Site2SiteServer.xml
----------------------------------------------------------------------
diff --git a/conf/flow_Site2SiteServer.xml b/conf/flow_Site2SiteServer.xml
new file mode 100644
index 0000000..acd2c1e
--- /dev/null
+++ b/conf/flow_Site2SiteServer.xml
@@ -0,0 +1,140 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<flowController>
+  <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
+  <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
+  <rootGroup>
+    <id>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</id>
+    <name>NiFi Flow</name>
+    <position x="0.0" y="0.0"/>
+    <comment/>
+    <processor>
+      <id>cd274fef-168a-486b-b21a-04ed17f981b7</id>
+      <name>LogAttribute</name>
+      <position x="2823.8107761867964" y="623.2524160253959"/>
+      <styles/>
+      <comment/>
+      <class>org.apache.nifi.processors.standard.LogAttribute</class>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+      <schedulingPeriod>0 sec</schedulingPeriod>
+      <penalizationPeriod>30 sec</penalizationPeriod>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <bulletinLevel>WARN</bulletinLevel>
+      <lossTolerant>false</lossTolerant>
+      <scheduledState>RUNNING</scheduledState>
+      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+      <runDurationNanos>0</runDurationNanos>
+      <property>
+        <name>Log Level</name>
+        <value>info</value>
+      </property>
+      <property>
+        <name>Log Payload</name>
+        <value>true</value>
+      </property>
+      <property>
+        <name>Attributes to Log</name>
+      </property>
+      <property>
+        <name>Attributes to Ignore</name>
+      </property>
+      <property>
+        <name>Log prefix</name>
+      </property>
+      <autoTerminatedRelationship>success</autoTerminatedRelationship>
+    </processor>
+    <processor>
+      <id>4fa35a7d-d1f0-44e4-87d7-7d69f0b78b7b</id>
+      <name>GenerateFlowFile</name>
+      <position x="2248.4411151522036" y="917.8589272756209"/>
+      <styles/>
+      <comment/>
+      <class>org.apache.nifi.processors.standard.GenerateFlowFile</class>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+      <schedulingPeriod>1 sec</schedulingPeriod>
+      <penalizationPeriod>30 sec</penalizationPeriod>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <bulletinLevel>WARN</bulletinLevel>
+      <lossTolerant>false</lossTolerant>
+      <scheduledState>RUNNING</scheduledState>
+      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+      <runDurationNanos>0</runDurationNanos>
+      <property>
+        <name>File Size</name>
+        <value>1024 kB</value>
+      </property>
+      <property>
+        <name>Batch Size</name>
+        <value>1</value>
+      </property>
+      <property>
+        <name>Data Format</name>
+        <value>Text</value>
+      </property>
+      <property>
+        <name>Unique FlowFiles</name>
+        <value>false</value>
+      </property>
+    </processor>
+    <inputPort>
+      <id>471deef6-2a6e-4a7d-912a-81cc17e3a204</id>
+      <name> From Node A</name>
+      <position x="2305.369919163486" y="646.0466623031645"/>
+      <comments/>
+      <scheduledState>RUNNING</scheduledState>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+    </inputPort>
+    <outputPort>
+      <id>75f88005-0a87-4fef-8320-6219cdbcf18b</id>
+      <name>To A</name>
+      <position x="2915.739181824911" y="1057.8803860295386"/>
+      <comments/>
+      <scheduledState>RUNNING</scheduledState>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+    </outputPort>
+    <label>
+      <id>2f0db43e-1ce0-49ab-96a5-459c285aff09</id>
+      <position x="2197.3693058093504" y="849.4395700448451"/>
+      <size height="286.5726013183594" width="1012.2957763671875"/>
+      <styles>
+        <style name="font-size">18px</style>
+      </styles>
+      <value>Generate Data that is pushed to Node A and made available to be 
pulled</value>
+    </label>
+    <connection>
+      <id>7f869898-3a93-4e28-a60c-064789870574</id>
+      <name/>
+      <bendPoints/>
+      <labelIndex>1</labelIndex>
+      <zIndex>0</zIndex>
+      <sourceId>471deef6-2a6e-4a7d-912a-81cc17e3a204</sourceId>
+      <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId>
+      <sourceType>INPUT_PORT</sourceType>
+      <destinationId>cd274fef-168a-486b-b21a-04ed17f981b7</destinationId>
+      
<destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId>
+      <destinationType>PROCESSOR</destinationType>
+      <relationship/>
+      <maxWorkQueueSize>0</maxWorkQueueSize>
+      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
+      <flowFileExpiration>0 sec</flowFileExpiration>
+    </connection>
+    <connection>
+      <id>9dbc73f6-c827-4258-8bc7-06eb6a9b79d5</id>
+      <name/>
+      <bendPoints/>
+      <labelIndex>1</labelIndex>
+      <zIndex>0</zIndex>
+      <sourceId>4fa35a7d-d1f0-44e4-87d7-7d69f0b78b7b</sourceId>
+      <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId>
+      <sourceType>PROCESSOR</sourceType>
+      <destinationId>75f88005-0a87-4fef-8320-6219cdbcf18b</destinationId>
+      
<destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId>
+      <destinationType>OUTPUT_PORT</destinationType>
+      <relationship>success</relationship>
+      <maxWorkQueueSize>0</maxWorkQueueSize>
+      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
+      <flowFileExpiration>0 sec</flowFileExpiration>
+    </connection>
+  </rootGroup>
+  <controllerServices/>
+  <reportingTasks/>
+</flowController>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/conf/nifi.properties
----------------------------------------------------------------------
diff --git a/conf/nifi.properties b/conf/nifi.properties
index c4f7dff..627876f 100644
--- a/conf/nifi.properties
+++ b/conf/nifi.properties
@@ -183,3 +183,9 @@ nifi.cluster.manager.safemode.duration=0 sec
 
 # kerberos #
 nifi.kerberos.krb5.file=
+
+# MiNiFi Server for Command Control
+nifi.server.name=localhost
+nifi.server.port=9000
+nifi.server.report.interval=1000 ms
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/inc/Exception.h
----------------------------------------------------------------------
diff --git a/inc/Exception.h b/inc/Exception.h
index c107123..d321454 100644
--- a/inc/Exception.h
+++ b/inc/Exception.h
@@ -34,6 +34,7 @@ enum ExceptionType
        PROCESSOR_EXCEPTION,
        PROCESS_SESSION_EXCEPTION,
        PROCESS_SCHEDULE_EXCEPTION,
+       SITE2SITE_EXCEPTION,
        GENERAL_EXCEPTION,
        MAX_EXCEPTION
 };
@@ -46,6 +47,7 @@ static const char *ExceptionStr[MAX_EXCEPTION] =
                "Processor Operation",
                "Process Session Operation",
                "Process Schedule Operation",
+               "Site2Site Protocol",
                "General Operation"
 };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/inc/FlowControlProtocol.h
----------------------------------------------------------------------
diff --git a/inc/FlowControlProtocol.h b/inc/FlowControlProtocol.h
index ae31ce5..24416f2 100644
--- a/inc/FlowControlProtocol.h
+++ b/inc/FlowControlProtocol.h
@@ -248,6 +248,11 @@ public:
        }
        //! Run function for the thread
        static void run(FlowControlProtocol *protocol);
+       //! set 8 bytes SerialNumber
+       void setSerialNumber(uint8_t *number)
+       {
+               memcpy(_serialNumber, number, 8);
+       }
 
 protected:
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/inc/FlowController.h
----------------------------------------------------------------------
diff --git a/inc/FlowController.h b/inc/FlowController.h
index 4fa5810..3895096 100644
--- a/inc/FlowController.h
+++ b/inc/FlowController.h
@@ -42,8 +42,10 @@
 #include "ProcessGroup.h"
 #include "GenerateFlowFile.h"
 #include "LogAttribute.h"
+#include "RealTimeDataCollector.h"
 #include "TimerDrivenSchedulingAgent.h"
 #include "FlowControlProtocol.h"
+#include "RemoteProcessorGroupPort.h"
 
 //! Default NiFi Root Group Name
 #define DEFAULT_ROOT_GROUP_NAME ""
@@ -139,6 +141,11 @@ public:
        ProcessGroup *createRemoteProcessGroup(std::string name, uuid_t uuid);
        //! Create Connection
        Connection *createConnection(std::string name, uuid_t uuid);
+       //! set 8 bytes SerialNumber
+       void setSerialNumber(uint8_t *number)
+       {
+               _protocol->setSerialNumber(number);
+       }
 
 protected:
 
@@ -183,12 +190,16 @@ private:
        std::atomic<bool> _initialized;
        //! Process Processor Node XML
        void parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, 
ProcessGroup *parent);
+       //! Process Port XML
+       void parsePort(xmlDoc *doc, xmlNode *processorNode, ProcessGroup 
*parent, TransferDirection direction);
        //! Process Root Processor Group XML
        void parseRootProcessGroup(xmlDoc *doc, xmlNode *node);
        //! Process Property XML
        void parseProcessorProperty(xmlDoc *doc, xmlNode *node, Processor 
*processor);
        //! Process connection XML
        void parseConnection(xmlDoc *doc, xmlNode *node, ProcessGroup *parent);
+       //! Process Remote Process Group
+       void parseRemoteProcessGroup(xmlDoc *doc, xmlNode *node, ProcessGroup 
*parent);
        // Prevent default copy constructor and assignment operation
        // Only support pass by reference or pointer
        FlowController(const FlowController &parent);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/inc/ProcessGroup.h
----------------------------------------------------------------------
diff --git a/inc/ProcessGroup.h b/inc/ProcessGroup.h
index 796142f..be69f2d 100644
--- a/inc/ProcessGroup.h
+++ b/inc/ProcessGroup.h
@@ -61,6 +61,41 @@ public:
        std::string getName(void) {
                return (_name);
        }
+       //! Set URL
+       void setURL(std::string url) {
+               _url = url;
+       }
+       //! Get URL
+       std::string getURL(void) {
+               return (_url);
+       }
+       //! SetTransmitting
+       void setTransmitting(bool val)
+       {
+               _transmitting = val;
+       }
+       //! Get Transmitting
+       bool getTransmitting()
+       {
+               return _transmitting;
+       }
+       //! setTimeOut
+       void setTimeOut(uint64_t time)
+       {
+               _timeOut = time;
+       }
+       uint64_t getTimeOut()
+       {
+               return _timeOut;
+       }
+       //! Set Processor yield period in MilliSecond
+       void setYieldPeriodMsec(uint64_t period) {
+               _yieldPeriodMsec = period;
+       }
+       //! Get Processor yield period in MilliSecond
+       uint64_t getYieldPeriodMsec(void) {
+               return(_yieldPeriodMsec);
+       }
        //! Set UUID
        void setUUID(uuid_t uuid) {
                uuid_copy(_uuid, uuid);
@@ -122,6 +157,13 @@ protected:
        std::set<Connection *> _connections;
        //! Parent Process Group
        ProcessGroup* _parentProcessGroup;
+       //! Yield Period in Milliseconds
+       std::atomic<uint64_t> _yieldPeriodMsec;
+       std::atomic<uint64_t> _timeOut;
+       //! URL
+       std::string _url;
+       //! Transmitting
+       std::atomic<bool> _transmitting;
 
 private:
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/inc/RealTimeDataCollector.h
----------------------------------------------------------------------
diff --git a/inc/RealTimeDataCollector.h b/inc/RealTimeDataCollector.h
new file mode 100644
index 0000000..760b566
--- /dev/null
+++ b/inc/RealTimeDataCollector.h
@@ -0,0 +1,131 @@
+/**
+ * @file RealTimeDataCollector.h
+ * RealTimeDataCollector class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REAL_TIME_DATA_COLLECTOR_H__
+#define __REAL_TIME_DATA_COLLECTOR_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <string>
+#include <errno.h>
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+
+//! RealTimeDataCollector Class
+class RealTimeDataCollector : public Processor
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new processor
+        */
+       RealTimeDataCollector(std::string name, uuid_t uuid = NULL)
+       : Processor(name, uuid)
+       {
+               _realTimeSocket = 0;
+               _batchSocket = 0;
+               _logger = Logger::getLogger();
+               _firstInvoking = false;
+               _realTimeAccumulated = 0;
+               _batchAcccumulated = 0;
+               _queuedDataSize = 0;
+       }
+       //! Destructor
+       virtual ~RealTimeDataCollector()
+       {
+               if (_realTimeSocket)
+                       close(_realTimeSocket);
+               if (_batchSocket)
+                       close(_batchSocket);
+               if (_fileStream.is_open())
+                       _fileStream.close();
+       }
+       //! Processor Name
+       static const std::string ProcessorName;
+       //! Supported Properties
+       static Property REALTIMESERVERNAME;
+       static Property REALTIMESERVERPORT;
+       static Property BATCHSERVERNAME;
+       static Property BATCHSERVERPORT;
+       static Property FILENAME;
+       static Property ITERATION;
+       static Property REALTIMEMSGID;
+       static Property BATCHMSGID;
+       static Property REALTIMEINTERVAL;
+       static Property BATCHINTERVAL;
+       static Property BATCHMAXBUFFERSIZE;
+       //! Supported Relationships
+       static Relationship Success;
+       //! Connect to the socket
+       int connectServer(const char *host, uint16_t port);
+       int sendData(int socket, const char *buf, int buflen);
+       void onTriggerRealTime(ProcessContext *context, ProcessSession 
*session);
+       void onTriggerBatch(ProcessContext *context, ProcessSession *session);
+
+public:
+       //! OnTrigger method, implemented by NiFi RealTimeDataCollector
+       virtual void onTrigger(ProcessContext *context, ProcessSession 
*session);
+       //! Initialize, over write by NiFi RealTimeDataCollector
+       virtual void initialize(void);
+
+protected:
+
+private:
+       //! realtime server Name
+       std::string _realTimeServerName;
+       int64_t _realTimeServerPort;
+       std::string _batchServerName;
+       int64_t _batchServerPort;
+       int64_t _realTimeInterval;
+       int64_t _batchInterval;
+       int64_t _batchMaxBufferSize;
+       //! Match pattern for Real time Message ID
+       std::vector<std::string> _realTimeMsgID;
+       //! Match pattern for Batch Message ID
+       std::vector<std::string> _batchMsgID;
+       //! file for which the realTime collector will tail
+       std::string _fileName;
+       //! Whether we need to iterate from the beginning for demo
+       bool _iteration;
+       int _realTimeSocket;
+       int _batchSocket;
+       //! Logger
+       Logger *_logger;
+       //! Mutex for protection
+       std::mutex _mtx;
+       //! Queued data size
+       uint64_t _queuedDataSize;
+       //! Queue for the batch process
+       std::queue<std::string> _queue;
+       std::thread::id _realTimeThreadId;
+       std::thread::id _batchThreadId;
+       std::atomic<bool> _firstInvoking;
+       int64_t _realTimeAccumulated;
+       int64_t _batchAcccumulated;
+       std::ifstream _fileStream;
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/inc/RemoteProcessorGroupPort.h
----------------------------------------------------------------------
diff --git a/inc/RemoteProcessorGroupPort.h b/inc/RemoteProcessorGroupPort.h
new file mode 100644
index 0000000..cd99e50
--- /dev/null
+++ b/inc/RemoteProcessorGroupPort.h
@@ -0,0 +1,96 @@
+/**
+ * @file RemoteProcessorGroupPort.h
+ * RemoteProcessorGroupPort class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REMOTE_PROCESSOR_GROUP_PORT_H__
+#define __REMOTE_PROCESSOR_GROUP_PORT_H__
+
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+#include "Site2SiteClientProtocol.h"
+
+//! RemoteProcessorGroupPort Class
+class RemoteProcessorGroupPort : public Processor
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new processor
+        */
+       RemoteProcessorGroupPort(std::string name, uuid_t uuid = NULL)
+       : Processor(name, uuid)
+       {
+               _logger = Logger::getLogger();
+               _peer = new Site2SitePeer("", 9999);
+               _protocol = new Site2SiteClientProtocol(_peer);
+               _protocol->setPortId(uuid);
+       }
+       //! Destructor
+       virtual ~RemoteProcessorGroupPort()
+       {
+               delete _protocol;
+               delete _peer;
+       }
+       //! Processor Name
+       static const std::string ProcessorName;
+       //! Supported Properties
+       static Property hostName;
+       static Property port;
+       //! Supported Relationships
+       static Relationship relation;
+public:
+       //! OnTrigger method, implemented by NiFi RemoteProcessorGroupPort
+       virtual void onTrigger(ProcessContext *context, ProcessSession 
*session);
+       //! Initialize, over write by NiFi RemoteProcessorGroupPort
+       virtual void initialize(void);
+       //! Set Direction
+       void setDirection(TransferDirection direction)
+       {
+               _direction = direction;
+               if (_direction == RECEIVE)
+                       this->setTriggerWhenEmpty(true);
+       }
+       //! Set Timeout
+       void setTimeOut(uint64_t timeout)
+       {
+               _protocol->setTimeOut(timeout);
+       }
+       //! SetTransmitting
+       void setTransmitting(bool val)
+       {
+               _transmitting = val;
+       }
+
+protected:
+
+private:
+       //! Logger
+       Logger *_logger;
+       //! Peer Connection
+       Site2SitePeer *_peer;
+       //! Peer Protocol
+       Site2SiteClientProtocol *_protocol;
+       //! Transaction Direction
+       TransferDirection _direction;
+       //! Transmitting
+       bool _transmitting;
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/inc/Site2SiteClientProtocol.h
----------------------------------------------------------------------
diff --git a/inc/Site2SiteClientProtocol.h b/inc/Site2SiteClientProtocol.h
new file mode 100644
index 0000000..2a517d7
--- /dev/null
+++ b/inc/Site2SiteClientProtocol.h
@@ -0,0 +1,633 @@
+/**
+ * @file Site2SiteClientProtocol.h
+ * Site2SiteClientProtocol class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __SITE2SITE_CLIENT_PROTOCOL_H__
+#define __SITE2SITE_CLIENT_PROTOCOL_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <string>
+#include <errno.h>
+#include <chrono>
+#include <thread>
+#include <algorithm>
+#include <uuid/uuid.h>
+#include "Logger.h"
+#include "Configure.h"
+#include "Property.h"
+#include "Site2SitePeer.h"
+#include "FlowFileRecord.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+//! Resource Negotiated Status Code
+#define RESOURCE_OK 20
+#define DIFFERENT_RESOURCE_VERSION 21
+#define NEGOTIATED_ABORT 255
+// ! Max attributes
+#define MAX_NUM_ATTRIBUTES 25000
+
+/**
+ * An enumeration for specifying the direction in which data should be
+ * transferred between a client and a remote NiFi instance.
+ */
+typedef enum {
+       /**
+        * * The client is to send data to the remote instance.
+        * */
+       SEND,
+       /**
+        * * The client is to receive data from the remote instance.
+        * */
+       RECEIVE
+} TransferDirection;
+
+
+//! Peer State
+typedef enum {
+       /**
+        * * IDLE
+        * */
+       IDLE = 0,
+       /**
+        * * Socket Established
+        * */
+       ESTABLISHED,
+       /**
+        * * HandShake Done
+        * */
+       HANDSHAKED,
+       /**
+        * * After CodeDec Completion
+        * */
+       READY
+} PeerState;
+
+//! Transaction State
+typedef enum {
+       /**
+        * * Transaction has been started but no data has been sent or received.
+        * */
+       TRANSACTION_STARTED,
+       /**
+        * * Transaction has been started and data has been sent or received.
+        * */
+       DATA_EXCHANGED,
+       /**
+        * * Data that has been transferred has been confirmed via its CRC.
+        * * Transaction is ready to be completed.
+        * */
+       TRANSACTION_CONFIRMED,
+       /**
+        * * Transaction has been successfully completed.
+        * */
+       TRANSACTION_COMPLETED,
+       /**
+        * * The Transaction has been canceled.
+        * */
+       TRANSACTION_CANCELED,
+       /**
+        * * The Transaction ended in an error.
+        * */
+       TRANSACTION_ERROR
+} TransactionState;
+
+//! Request Type
+typedef enum {
+    NEGOTIATE_FLOWFILE_CODEC = 0,
+    REQUEST_PEER_LIST,
+    SEND_FLOWFILES,
+    RECEIVE_FLOWFILES,
+    SHUTDOWN,
+       MAX_REQUEST_TYPE
+} RequestType;
+
+//! Request Type Str
+static const char *RequestTypeStr[MAX_REQUEST_TYPE] =
+{
+               "NEGOTIATE_FLOWFILE_CODEC",
+               "REQUEST_PEER_LIST",
+               "SEND_FLOWFILES",
+               "RECEIVE_FLOWFILES",
+               "SHUTDOWN"
+};
+
+//! Respond Code
+typedef enum {
+               RESERVED = 0,
+               // ResponseCode, so that we can indicate a 0 followed by some 
other bytes
+
+               // handshaking properties
+               PROPERTIES_OK = 1,
+               UNKNOWN_PROPERTY_NAME = 230,
+               ILLEGAL_PROPERTY_VALUE = 231,
+               MISSING_PROPERTY = 232,
+               // transaction indicators
+               CONTINUE_TRANSACTION = 10,
+               FINISH_TRANSACTION = 11,
+               CONFIRM_TRANSACTION = 12, // "Explanation" of this code is the 
checksum
+               TRANSACTION_FINISHED = 13,
+               TRANSACTION_FINISHED_BUT_DESTINATION_FULL = 14,
+               CANCEL_TRANSACTION = 15,
+               BAD_CHECKSUM = 19,
+               // data availability indicators
+               MORE_DATA = 20,
+               NO_MORE_DATA = 21,
+               // port state indicators
+               UNKNOWN_PORT = 200,
+               PORT_NOT_IN_VALID_STATE = 201,
+               PORTS_DESTINATION_FULL = 202,
+               // authorization
+               UNAUTHORIZED = 240,
+               // error indicators
+               ABORT = 250,
+               UNRECOGNIZED_RESPONSE_CODE = 254,
+               END_OF_STREAM = 255
+} RespondCode;
+
+//! Respond Code Class
+typedef struct {
+       RespondCode code;
+       const char *description;
+       bool hasDescription;
+} RespondCodeContext;
+
+//! Respond Code Context
+static RespondCodeContext respondCodeContext[]  =
+{
+               {RESERVED, "Reserved for Future Use", false},
+               {PROPERTIES_OK, "Properties OK", false},
+               {UNKNOWN_PROPERTY_NAME, "Unknown Property Name", true},
+               {ILLEGAL_PROPERTY_VALUE, "Illegal Property Value", true},
+               {MISSING_PROPERTY, "Missing Property", true},
+               {CONTINUE_TRANSACTION, "Continue Transaction", false},
+               {FINISH_TRANSACTION, "Finish Transaction", false},
+               {CONFIRM_TRANSACTION, "Confirm Transaction", true},
+               {TRANSACTION_FINISHED, "Transaction Finished", false},
+               {TRANSACTION_FINISHED_BUT_DESTINATION_FULL, "Transaction 
Finished But Destination is Full", false},
+               {CANCEL_TRANSACTION, "Cancel Transaction", true},
+               {BAD_CHECKSUM, "Bad Checksum", false},
+               {MORE_DATA, "More Data Exists", false},
+               {NO_MORE_DATA, "No More Data Exists", false},
+               {UNKNOWN_PORT, "Unknown Port", false},
+           {PORT_NOT_IN_VALID_STATE, "Port Not in a Valid State", true},
+               {PORTS_DESTINATION_FULL, "Port's Destination is Full", false},
+               {UNAUTHORIZED, "User Not Authorized", true},
+               {ABORT, "Abort", true},
+               {UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", 
false},
+               {END_OF_STREAM, "End of Stream", false}
+};
+
+//! Respond Code Sequence Pattern
+static const uint8_t CODE_SEQUENCE_VALUE_1 = (uint8_t) 'R';
+static const uint8_t CODE_SEQUENCE_VALUE_2 = (uint8_t) 'C';
+
+/**
+ * Enumeration of Properties that can be used for the Site-to-Site Socket
+ * Protocol.
+ */
+typedef enum {
+               /**
+                * Boolean value indicating whether or not the contents of a 
FlowFile should
+                * be GZipped when transferred.
+                */
+               GZIP,
+               /**
+                * The unique identifier of the port to communicate with
+                */
+               PORT_IDENTIFIER,
+               /**
+                * Indicates the number of milliseconds after the request was 
made that the
+                * client will wait for a response. If no response has been 
received by the
+                * time this value expires, the server can move on without 
attempting to
+                * service the request because the client will have already 
disconnected.
+                */
+               REQUEST_EXPIRATION_MILLIS,
+               /**
+                * The preferred number of FlowFiles that the server should 
send to the
+                * client when pulling data. This property was introduced in 
version 5 of
+                * the protocol.
+                */
+               BATCH_COUNT,
+               /**
+                * The preferred number of bytes that the server should send to 
the client
+                * when pulling data. This property was introduced in version 5 
of the
+                * protocol.
+                */
+               BATCH_SIZE,
+               /**
+                * The preferred amount of time that the server should send 
data to the
+                * client when pulling data. This property was introduced in 
version 5 of
+                * the protocol. Value is in milliseconds.
+                */
+               BATCH_DURATION,
+               MAX_HANDSHAKE_PROPERTY
+} HandshakeProperty;
+
+//! HandShakeProperty Str
+static const char *HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] =
+{
+               /**
+                * Boolean value indicating whether or not the contents of a 
FlowFile should
+                * be GZipped when transferred.
+                */
+               "GZIP",
+               /**
+                * The unique identifier of the port to communicate with
+                */
+               "PORT_IDENTIFIER",
+               /**
+                * Indicates the number of milliseconds after the request was 
made that the
+                * client will wait for a response. If no response has been 
received by the
+                * time this value expires, the server can move on without 
attempting to
+                * service the request because the client will have already 
disconnected.
+                */
+               "REQUEST_EXPIRATION_MILLIS",
+               /**
+                * The preferred number of FlowFiles that the server should 
send to the
+                * client when pulling data. This property was introduced in 
version 5 of
+                * the protocol.
+                */
+               "BATCH_COUNT",
+               /**
+                * The preferred number of bytes that the server should send to 
the client
+                * when pulling data. This property was introduced in version 5 
of the
+                * protocol.
+                */
+               "BATCH_SIZE",
+               /**
+                * The preferred amount of time that the server should send 
data to the
+                * client when pulling data. This property was introduced in 
version 5 of
+                * the protocol. Value is in milliseconds.
+                */
+               "BATCH_DURATION"
+};
+
+class Site2SiteClientProtocol;
+
+//! Transaction Class
+class Transaction
+{
+       friend class Site2SiteClientProtocol;
+public:
+       //! Constructor
+       /*!
+        * Create a new transaction
+        */
+       Transaction(TransferDirection direction) {
+               _state = TRANSACTION_STARTED;
+               _direction = direction;
+               _dataAvailable = false;
+               _transfers = 0;
+               _bytes = 0;
+
+               char uuidStr[37];
+
+               // Generate the global UUID for the transaction
+               uuid_generate(_uuid);
+               uuid_unparse(_uuid, uuidStr);
+               _uuidStr = uuidStr;
+       }
+       //! Destructor
+       virtual ~Transaction()
+       {
+       }
+       //! getUUIDStr
+       std::string getUUIDStr()
+       {
+               return _uuidStr;
+       }
+       //! getState
+       TransactionState getState()
+       {
+               return _state;
+       }
+       //! isDataAvailable
+       bool isDataAvailable()
+       {
+               return _dataAvailable;
+       }
+       //! setDataAvailable()
+       void setDataAvailable(bool value)
+       {
+               _dataAvailable = value;
+       }
+       //! getDirection
+       TransferDirection getDirection()
+       {
+               return _direction;
+       }
+       //! getCRC
+       long getCRC()
+       {
+               return _crc.getCRC();
+       }
+       //! updateCRC
+       void updateCRC(uint8_t *buffer, uint32_t length)
+       {
+               _crc.update(buffer, length);
+       }
+
+protected:
+
+private:
+       //! Transaction State
+       TransactionState _state;
+       //! Transaction Direction
+       TransferDirection _direction;
+       //! Whether received data is available
+       bool _dataAvailable;
+       //! A global unique identifier
+       uuid_t _uuid;
+       //! UUID string
+       std::string _uuidStr;
+       //! Number of transfer
+       int _transfers;
+       //! Number of content bytes
+       uint64_t _bytes;
+       //! CRC32
+       CRC32 _crc;
+
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       Transaction(const Transaction &parent);
+       Transaction &operator=(const Transaction &parent);
+};
+
+/**
+ * Represents a piece of data that is to be sent to or that was received from a
+ * NiFi instance.
+ */
+class DataPacket
+{
+public:
+       DataPacket(Site2SiteClientProtocol *protocol, Transaction *transaction,
+                       std::map<std::string, std::string> attributes) {
+               _protocol = protocol;
+               _size = 0;
+               _transaction = transaction;
+               _attributes = attributes;
+       }
+       std::map<std::string, std::string> _attributes;
+       uint64_t _size;
+       Site2SiteClientProtocol *_protocol;
+       Transaction *_transaction;
+};
+
+//! Site2SiteClientProtocol Class
+class Site2SiteClientProtocol
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new control protocol
+        */
+       Site2SiteClientProtocol(Site2SitePeer *peer) {
+               _logger = Logger::getLogger();
+               _configure = Configure::getConfigure();
+               _peer = peer;
+               _batchSize = 0;
+               _batchCount = 0;
+               _batchDuration = 0;
+               _batchSendNanos = 5000000000; // 5 seconds
+               _timeOut = 30000; // 30 seconds
+               _peerState = IDLE;
+               _supportedVersion[0] = 5;
+               _supportedVersion[1] = 4;
+               _supportedVersion[2] = 3;
+               _supportedVersion[3] = 2;
+               _supportedVersion[4] = 1;
+               _currentVersion = _supportedVersion[0];
+               _currentVersionIndex = 0;
+               _supportedCodecVersion[0] = 1;
+               _currentCodecVersion = _supportedCodecVersion[0];
+               _currentCodecVersionIndex = 0;
+       }
+       //! Destructor
+       virtual ~Site2SiteClientProtocol()
+       {
+       }
+
+public:
+       //! setBatchSize
+       void setBatchSize(uint64_t size)
+       {
+               _batchSize = size;
+       }
+       //! setBatchCount
+       void setBatchCount(uint64_t count)
+       {
+               _batchCount = count;
+       }
+       //! setBatchDuration
+       void setBatchDuration(uint64_t duration)
+       {
+               _batchDuration = duration;
+       }
+       //! setTimeOut
+       void setTimeOut(uint64_t time)
+       {
+               _timeOut = time;
+               if (_peer)
+                       _peer->setTimeOut(time);
+
+       }
+       //! setPortId
+       void setPortId(uuid_t id)
+       {
+               uuid_copy(_portId, id);
+               char idStr[37];
+               uuid_unparse(id, idStr);
+               _portIdStr = idStr;
+       }
+       //! getResourceName
+       std::string getResourceName()
+       {
+               return "SocketFlowFileProtocol";
+       }
+       //! getCodecResourceName
+       std::string getCodecResourceName()
+       {
+               return "StandardFlowFileCodec";
+       }
+       //! bootstrap the protocol to the ready for transaction state by going 
through the state machine
+       bool bootstrap();
+       //! establish
+       bool establish();
+       //! handShake
+       bool handShake();
+       //! negotiateCodec
+       bool negotiateCodec();
+       //! initiateResourceNegotiation
+       bool initiateResourceNegotiation();
+       //! initiateCodecResourceNegotiation
+       bool initiateCodecResourceNegotiation();
+       //! tearDown
+       void tearDown();
+       //! write Request Type
+       int writeRequestType(RequestType type);
+       //! read Request Type
+       int readRequestType(RequestType &type);
+       //! read Respond
+       int readRespond(RespondCode &code, std::string &message);
+       //! write respond
+       int writeRespond(RespondCode code, std::string message);
+       //! getRespondCodeContext
+       RespondCodeContext *getRespondCodeContext(RespondCode code)
+       {
+               for (unsigned int i = 0; i < 
sizeof(respondCodeContext)/sizeof(RespondCodeContext); i++)
+               {
+                       if (respondCodeContext[i].code == code)
+                       {
+                               return &respondCodeContext[i];
+                       }
+               }
+               return NULL;
+       }
+       //! getPeer
+       Site2SitePeer *getPeer()
+       {
+               return _peer;
+       }
+       //! Creation of a new transaction, return the transaction ID if success,
+       //! Return NULL when any error occurs
+       Transaction *createTransaction(std::string &transactionID, 
TransferDirection direction);
+       //! Receive the data packet from the transaction
+       //! Return false when any error occurs
+       bool receive(std::string transactionID, DataPacket *packet, bool &eof);
+       //! Send the data packet from the transaction
+       //! Return false when any error occurs
+       bool send(std::string transactionID, DataPacket *packet, FlowFileRecord 
*flowFile, ProcessSession *session);
+       //! Confirm the data that was sent or received by comparing CRC32's of 
the data sent and the data received.
+       bool confirm(std::string transactionID);
+       //! Cancel the transaction
+       void cancel(std::string transactionID);
+       //! Complete the transaction
+       bool complete(std::string transactionID);
+       //! Error the transaction
+       void error(std::string transactionID);
+       //! Receive flow files for the process session
+       void receiveFlowFiles(ProcessContext *context, ProcessSession *session);
+       //! Transfer flow files for the process session
+       void transferFlowFiles(ProcessContext *context, ProcessSession 
*session);
+       //! deleteTransaction
+       void deleteTransaction(std::string transactionID);
+       //! Nest Callback Class for write stream
+       class WriteCallback : public OutputStreamCallback
+       {
+               public:
+               WriteCallback(DataPacket *packet)
+               : _packet(packet) {}
+               DataPacket *_packet;
+               void process(std::ofstream *stream) {
+                       uint8_t buffer[8192];
+                       int len = _packet->_size;
+                       while (len > 0)
+                       {
+                               int size = std::min(len, (int) sizeof(buffer));
+                               int ret = 
_packet->_protocol->_peer->readData(buffer, size, &_packet->_transaction->_crc);
+                               if (ret != size)
+                               {
+                                       
_packet->_protocol->_logger->log_error("Site2Site Receive Flow Size %d Failed 
%d", size, ret);
+                                       break;
+                               }
+                               stream->write((const char *) buffer, size);
+                               len -= size;
+                       }
+               }
+       };
+       //! Nest Callback Class for read stream
+       class ReadCallback : public InputStreamCallback
+       {
+               public:
+               ReadCallback(DataPacket *packet)
+               : _packet(packet) {}
+               DataPacket *_packet;
+               void process(std::ifstream *stream) {
+                       _packet->_size = 0;
+                       uint8_t buffer[8192];
+                       int readSize;
+                       while (stream->good())
+                       {
+                               if (!stream->read((char *)buffer, 8192))
+                                       readSize = stream->gcount();
+                               else
+                                       readSize = 8192;
+                               int ret = 
_packet->_protocol->_peer->write(buffer, readSize, 
&_packet->_transaction->_crc);
+                               if (ret != readSize)
+                               {
+                                       
_packet->_protocol->_logger->log_error("Site2Site Send Flow Size %d Failed %d", 
readSize, ret);
+                                       break;
+                               }
+                               _packet->_size += readSize;
+                       }
+               }
+       };
+
+protected:
+
+private:
+       //! Mutex for protection
+       std::mutex _mtx;
+       //! Logger
+       Logger *_logger;
+       //! Configure
+       Configure *_configure;
+       //! Batch Count
+       std::atomic<uint64_t> _batchCount;
+       //! Batch Size
+       std::atomic<uint64_t> _batchSize;
+       //! Batch Duration in msec
+       std::atomic<uint64_t> _batchDuration;
+       //! Timeout in msec
+       std::atomic<uint64_t> _timeOut;
+       //! Peer Connection
+       Site2SitePeer *_peer;
+       //! portId
+       uuid_t _portId;
+       //! portIDStr
+       std::string _portIdStr;
+       //! BATCH_SEND_NANOS
+       uint64_t _batchSendNanos;
+       //! Peer State
+       PeerState _peerState;
+       uint32_t _supportedVersion[5];
+       uint32_t _currentVersion;
+       int _currentVersionIndex;
+       uint32_t _supportedCodecVersion[1];
+       uint32_t _currentCodecVersion;
+       int _currentCodecVersionIndex;
+       //! commsIdentifier
+       std::string _commsIdentifier;
+       //! transaction map
+       std::map<std::string, Transaction *> _transactionMap;
+
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       Site2SiteClientProtocol(const Site2SiteClientProtocol &parent);
+       Site2SiteClientProtocol &operator=(const Site2SiteClientProtocol 
&parent);
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/inc/Site2SitePeer.h
----------------------------------------------------------------------
diff --git a/inc/Site2SitePeer.h b/inc/Site2SitePeer.h
new file mode 100644
index 0000000..e6972ad
--- /dev/null
+++ b/inc/Site2SitePeer.h
@@ -0,0 +1,359 @@
+/**
+ * @file Site2SitePeer.h
+ * Site2SitePeer class declaration for site to site peer  
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __SITE2SITE_PEER_H__
+#define __SITE2SITE_PEER_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <string>
+#include <errno.h>
+#include <mutex>
+#include <atomic>
+#include "TimeUtil.h"
+#include "Logger.h"
+#include "Configure.h"
+#include "Property.h"
+
+class CRC32
+{
+public:
+    CRC32() {
+       crc = 0;
+
+       if (tableInit)
+               return;
+
+       tableInit = true;
+        unsigned int poly = 0xedb88320;
+        unsigned int temp = 0;
+        for(unsigned int i = 0; i < 256; ++i) {
+            temp = i;
+            for(int j = 8; j > 0; --j) {
+                if((temp & 1) == 1) {
+                    temp = (unsigned int)((temp >> 1) ^ poly);
+                }else {
+                    temp >>= 1;
+                }
+            }
+            table[i] = temp;
+        }
+    }
+
+    unsigned int update(uint8_t * bytes, size_t size) {
+       crc = crc ^ ~0U;
+        for(unsigned int i = 0; i < size; ++i) {
+            uint8_t index = (uint8_t)(((crc) & 0xff) ^ bytes[i]);
+            crc = (unsigned int)((crc >> 8) ^ table[index]);
+        }
+        crc = crc ^ ~0U;
+        return crc;
+    }
+
+    long getCRC()
+    {
+       return crc;
+    }
+
+private:
+    static unsigned int table[256];
+    static std::atomic<bool> tableInit;
+    unsigned int crc;
+};
+
+static const char MAGIC_BYTES[] = {'N', 'i', 'F', 'i'};
+
+//! Site2SitePeer Class
+class Site2SitePeer
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new site2site peer
+        */
+       Site2SitePeer(std::string host, uint16_t port) {
+               _logger = Logger::getLogger();
+               _configure = Configure::getConfigure();
+               _socket = 0;
+               _host = host;
+               _port = port;
+               _yieldExpiration = 0;
+               _timeOut = 30000; // 30 seconds
+               _url = "nifi://" + _host + ":" + std::to_string(_port);
+       }
+       //! Destructor
+       virtual ~Site2SitePeer() { Close();}
+       //! Set Processor yield period in MilliSecond
+       void setYieldPeriodMsec(uint64_t period) {
+               _yieldPeriodMsec = period;
+       }
+       //! get URL
+       std::string getURL() {
+               return _url;
+       }
+       //! Get Processor yield period in MilliSecond
+       uint64_t getYieldPeriodMsec(void) {
+               return(_yieldPeriodMsec);
+       }
+       //! Yield based on the yield period
+       void yield()
+       {
+               _yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
+       }
+       //! setHostName
+       void setHostName(std::string host)
+       {
+               _host = host;
+               _url = "nifi://" + _host + ":" + std::to_string(_port);
+       }
+       //! setPort
+       void setPort(uint16_t port)
+       {
+               _port = port;
+               _url = "nifi://" + _host + ":" + std::to_string(_port);
+       }
+       //! getHostName
+       std::string getHostName()
+       {
+               return _host;
+       }
+       //! getPort
+       uint16_t getPort()
+       {
+               return _port;
+       }
+       //! Yield based on the input time
+       void yield(uint64_t time)
+       {
+               _yieldExpiration = (getTimeMillis() + time);
+       }
+       //! whether need be to yield
+       bool isYield()
+       {
+               if (_yieldExpiration > 0)
+                       return (_yieldExpiration >= getTimeMillis());
+               else
+                       return false;
+       }
+       // clear yield expiration
+       void clearYield()
+       {
+               _yieldExpiration = 0;
+       }
+       //! Yield based on the yield period
+       void yield(std::string portId)
+       {
+               std::lock_guard<std::mutex> lock(_mtx);
+               uint64_t yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
+               _yieldExpirationPortIdMap[portId] = yieldExpiration;
+       }
+       //! Yield based on the input time
+       void yield(std::string portId, uint64_t time)
+       {
+               std::lock_guard<std::mutex> lock(_mtx);
+               uint64_t yieldExpiration = (getTimeMillis() + time);
+               _yieldExpirationPortIdMap[portId] = yieldExpiration;
+       }
+       //! whether need be to yield
+       bool isYield(std::string portId)
+       {
+               std::lock_guard<std::mutex> lock(_mtx);
+               std::map<std::string, uint64_t>::iterator it = 
this->_yieldExpirationPortIdMap.find(portId);
+               if (it != _yieldExpirationPortIdMap.end())
+               {
+                       uint64_t yieldExpiration = it->second;
+                       return (yieldExpiration >= getTimeMillis());
+               }
+               else
+               {
+                       return false;
+               }
+       }
+       //! clear yield expiration
+       void clearYield(std::string portId)
+       {
+               std::lock_guard<std::mutex> lock(_mtx);
+               std::map<std::string, uint64_t>::iterator it = 
this->_yieldExpirationPortIdMap.find(portId);
+               if (it != _yieldExpirationPortIdMap.end())
+               {
+                       _yieldExpirationPortIdMap.erase(portId);
+               }
+       }
+       //! setTimeOut
+       void setTimeOut(uint64_t time)
+       {
+               _timeOut = time;
+       }
+       int write(uint8_t value, CRC32 *crc = NULL)
+       {
+               return sendData(&value, 1, crc);
+       }
+       int write(char value, CRC32 *crc = NULL)
+       {
+               return sendData((uint8_t *)&value, 1, crc);
+       }
+       int write(uint32_t value, CRC32 *crc = NULL)
+       {
+               uint8_t temp[4];
+
+               temp[0] = (value & 0xFF000000) >> 24;
+               temp[1] = (value & 0x00FF0000) >> 16;
+               temp[2] = (value & 0x0000FF00) >> 8;
+               temp[3] = (value & 0x000000FF);
+               return sendData(temp, 4, crc);
+       }
+       int write(uint16_t value, CRC32 *crc = NULL)
+       {
+               uint8_t temp[2];
+               temp[0] = (value & 0xFF00) >> 8;
+               temp[1] = (value & 0xFF);
+               return sendData(temp, 2, crc);
+       }
+       int write(uint8_t *value, int len, CRC32 *crc = NULL)
+       {
+               return sendData(value, len, crc);
+       }
+       int write(uint64_t value, CRC32 *crc = NULL)
+       {
+               uint8_t temp[8];
+
+               temp[0] = (value >> 56) & 0xFF;
+               temp[1] = (value >> 48) & 0xFF;
+               temp[2] = (value >> 40) & 0xFF;
+               temp[3] = (value >> 32) & 0xFF;
+               temp[4] = (value >> 24) & 0xFF;
+               temp[5] = (value >> 16) & 0xFF;
+               temp[6] = (value >>  8) & 0xFF;
+               temp[7] = (value >>  0) & 0xFF;
+               return sendData(temp, 8, crc);
+       }
+       int write(bool value, CRC32 *crc = NULL)
+       {
+               uint8_t temp = value;
+               return write(temp, crc);
+       }
+       int writeUTF(std::string str, bool widen = false, CRC32 *crc = NULL);
+       int read(uint8_t &value, CRC32 *crc = NULL)
+       {
+               uint8_t buf;
+
+               int ret = readData(&buf, 1, crc);
+               if (ret == 1)
+                       value = buf;
+               return ret;
+       }
+       int read(uint16_t &value, CRC32 *crc = NULL)
+       {
+               uint8_t buf[2];
+
+               int ret = readData(buf, 2, crc);
+               if (ret == 2)
+                       value = (buf[0] << 8) | buf[1];
+               return ret;
+       }
+       int read(char &value, CRC32 *crc = NULL)
+       {
+               uint8_t buf;
+
+               int ret = readData(&buf, 1, crc);
+               if (ret == 1)
+                       value = (char) buf;
+               return ret;
+       }
+       int read(uint8_t *value, int len, CRC32 *crc = NULL)
+       {
+               return readData(value, len, crc);
+       }
+       int read(uint32_t &value, CRC32 *crc = NULL)
+       {
+               uint8_t buf[4];
+
+               int ret = readData(buf, 4, crc);
+               if (ret == 4)
+                       value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) 
| buf[3];
+               return ret;
+       }
+       int read(uint64_t &value, CRC32 *crc = NULL)
+       {
+               uint8_t buf[8];
+
+               int ret = readData(buf, 8, crc);
+               if (ret == 8)
+               {
+                       value = ((uint64_t) buf[0] << 56) |
+                                       ((uint64_t) (buf[1] & 255) << 48) |
+                                       ((uint64_t) (buf[2] & 255) << 40) |
+                                       ((uint64_t) (buf[3] & 255) << 32) |
+                                       ((uint64_t) (buf[4] & 255) << 24) |
+                                       ((uint64_t) (buf[5] & 255) << 16) |
+                                       ((uint64_t) (buf[6] & 255) <<  8) |
+                                       ((uint64_t) (buf[7] & 255) <<  0);
+               }
+               return ret;
+       }
+       int readUTF(std::string &str, bool widen = false, CRC32 *crc = NULL);
+       //! open connection to the peer
+       bool Open();
+       //! close connection to the peer
+       void Close();
+       //! Send Data via the socket, return -1 for failure
+       int sendData(uint8_t *buf, int buflen, CRC32 *crc = NULL);
+       //! Read length into buf, return -1 for failure and 0 for EOF
+       int readData(uint8_t *buf, int buflen, CRC32 *crc = NULL);
+       //! Select on the socket
+       int Select(int msec);
+
+protected:
+
+private:
+       //! Mutex for protection
+       std::mutex _mtx;
+       //! S2S server Name
+       std::string _host;
+       //! S2S server port
+       uint16_t _port;
+       //! socket to server
+       int _socket;
+       //! URL
+       std::string _url;
+       //! socket timeout;
+       std::atomic<uint64_t> _timeOut;
+       //! Logger
+       Logger *_logger;
+       //! Configure
+       Configure *_configure;
+       //! Yield Period in Milliseconds
+       std::atomic<uint64_t> _yieldPeriodMsec;
+       //! Yield Expiration
+       std::atomic<uint64_t> _yieldExpiration;
+       //! Yield Expiration per destination PortID
+       std::map<std::string, uint64_t> _yieldExpirationPortIdMap;
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       Site2SitePeer(const Site2SitePeer &parent);
+       Site2SitePeer &operator=(const Site2SitePeer &parent);
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/inc/TimeUtil.h
----------------------------------------------------------------------
diff --git a/inc/TimeUtil.h b/inc/TimeUtil.h
index 0c9bac4..b024245 100644
--- a/inc/TimeUtil.h
+++ b/inc/TimeUtil.h
@@ -73,7 +73,7 @@ inline std::string getTimeStr(uint64_t msec)
 
        std::string ret = date;
        date[0] = '\0';
-       sprintf(date, ".%03llu", msec);
+       sprintf(date, ".%03llu", (unsigned long long) msec);
 
        ret += date;
        return ret;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/main/MiNiFiMain.cpp
----------------------------------------------------------------------
diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp
index 4d9db5d..1ceaba5 100644
--- a/main/MiNiFiMain.cpp
+++ b/main/MiNiFiMain.cpp
@@ -52,7 +52,7 @@ void sigHandler(int signal)
 int main(int argc, char **argv)
 {
        Logger *logger = Logger::getLogger();
-       logger->setLogLevel(info);
+       logger->setLogLevel(trace);
        logger->log_info("MiNiFi started");
 
        if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, 
sigHandler) == SIG_ERR)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/FlowControlProtocol.cpp
----------------------------------------------------------------------
diff --git a/src/FlowControlProtocol.cpp b/src/FlowControlProtocol.cpp
index 6aaa969..6f1517c 100644
--- a/src/FlowControlProtocol.cpp
+++ b/src/FlowControlProtocol.cpp
@@ -215,6 +215,8 @@ int FlowControlProtocol::readHdr(FlowControlProtocolHeader 
*hdr)
 
 void FlowControlProtocol::start()
 {
+       if (_reportInterval <= 0)
+               return;
        if (_running)
                return;
        _running = true;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/src/FlowController.cpp b/src/FlowController.cpp
index a2cafbc..b18953e 100644
--- a/src/FlowController.cpp
+++ b/src/FlowController.cpp
@@ -136,6 +136,10 @@ Processor *FlowController::createProcessor(std::string 
name, uuid_t uuid)
        {
                processor = new LogAttribute(name, uuid);
        }
+       else if (name == RealTimeDataCollector::ProcessorName)
+       {
+               processor = new RealTimeDataCollector(name, uuid);
+       }
        else
        {
                _logger->log_error("No Processor defined for %s", name.c_str());
@@ -269,10 +273,20 @@ void FlowController::parseConnection(xmlDoc *doc, xmlNode 
*node, ProcessGroup *p
                                if (temp)
                                {
                                        std::string relationshipName = temp;
-                                       Relationship 
relationship(relationshipName, "");
-                                       _logger->log_debug("parseConnection: 
relationship => [%s]", relationshipName.c_str());
-                                       if (connection)
-                                               
connection->setRelationship(relationship);
+                                       if (!relationshipName.empty())
+                                       {
+                                               Relationship 
relationship(relationshipName, "");
+                                               
_logger->log_debug("parseConnection: relationship => [%s]", 
relationshipName.c_str());
+                                               if (connection)
+                                                       
connection->setRelationship(relationship);
+                                       }
+                                       else
+                                       {
+                                               Relationship empty;
+                                               
_logger->log_debug("parseConnection: relationship => [%s]", 
empty.getName().c_str());
+                                               if (connection)
+                                                       
connection->setRelationship(empty);
+                                       }
                                        xmlFree(temp);
                                }
                        }
@@ -334,9 +348,110 @@ void FlowController::parseRootProcessGroup(xmlDoc *doc, 
xmlNode *node)
                        {
                                this->parseConnection(doc, currentNode, group);
                        }
+                       else if (xmlStrcmp(currentNode->name, BAD_CAST 
"remoteProcessGroup") == 0)
+                       {
+                               this->parseRemoteProcessGroup(doc, currentNode, 
group);
+                       }
                } // if (currentNode->type == XML_ELEMENT_NODE)
       } // for node
+}
 
+void FlowController::parseRemoteProcessGroup(xmlDoc *doc, xmlNode *node, 
ProcessGroup *parent)
+{
+       uuid_t uuid;
+       xmlNode *currentNode;
+       ProcessGroup *group = NULL;
+       int64_t yieldPeriod = -1;
+       int64_t timeOut = -1;
+
+       // generate the random UIID
+       uuid_generate(uuid);
+
+       for (currentNode = node->xmlChildrenNode; currentNode != NULL; 
currentNode = currentNode->next)
+       {
+               if (currentNode->type == XML_ELEMENT_NODE)
+               {
+                       if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0)
+                       {
+                               char *id = (char *) 
xmlNodeGetContent(currentNode);
+                               if (id)
+                               {
+                                       
_logger->log_debug("parseRootProcessGroup: id => [%s]", id);
+                                       uuid_parse(id, uuid);
+                                       xmlFree(id);
+                               }
+                       }
+                       else if (xmlStrcmp(currentNode->name, BAD_CAST "name") 
== 0)
+                       {
+                               char *name = (char *) 
xmlNodeGetContent(currentNode);
+                               if (name)
+                               {
+                                       
_logger->log_debug("parseRemoteProcessGroup: name => [%s]", name);
+                                       group = 
this->createRemoteProcessGroup(name, uuid);
+                                       if (group == NULL)
+                                       {
+                                               xmlFree(name);
+                                               return;
+                                       }
+                                       group->setParent(parent);
+                                       parent->addProcessGroup(group);
+                                       xmlFree(name);
+                               }
+                       }
+                       else if (xmlStrcmp(currentNode->name, BAD_CAST 
"yieldPeriod") == 0)
+                       {
+                               TimeUnit unit;
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               if (temp)
+                               {
+                                       if (Property::StringToTime(temp, 
yieldPeriod, unit) &&
+                                                       
Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod) && group)
+                                       {
+                                               
_logger->log_debug("parseRemoteProcessGroup: yieldPeriod => [%d] ms", 
yieldPeriod);
+                                               
group->setYieldPeriodMsec(yieldPeriod);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       }
+                       else if (xmlStrcmp(currentNode->name, BAD_CAST 
"timeout") == 0)
+                       {
+                               TimeUnit unit;
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               if (temp)
+                               {
+                                       if (Property::StringToTime(temp, 
timeOut, unit) &&
+                                                       
Property::ConvertTimeUnitToMS(timeOut, unit, timeOut) && group)
+                                       {
+                                               
_logger->log_debug("parseRemoteProcessGroup: timeOut => [%d] ms", timeOut);
+                                               group->setTimeOut(timeOut);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       }
+                       else if (xmlStrcmp(currentNode->name, BAD_CAST 
"transmitting") == 0)
+                       {
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               bool transmitting;
+                               if (temp)
+                               {
+                                       if (Property::StringToBool(temp, 
transmitting) && group)
+                                       {
+                                               
_logger->log_debug("parseRemoteProcessGroup: transmitting => [%d]", 
transmitting);
+                                               
group->setTransmitting(transmitting);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       }
+                       else if (xmlStrcmp(currentNode->name, BAD_CAST 
"inputPort") == 0 && group)
+                       {
+                               this->parsePort(doc, currentNode, group, SEND);
+                       }
+                       else if (xmlStrcmp(currentNode->name, BAD_CAST 
"outputPort") == 0 && group)
+                       {
+                               this->parsePort(doc, currentNode, group, 
RECEIVE);
+                       }
+               } // if (currentNode->type == XML_ELEMENT_NODE)
+      } // for node
 }
 
 void FlowController::parseProcessorProperty(xmlDoc *doc, xmlNode *node, 
Processor *processor)
@@ -383,6 +498,106 @@ void FlowController::parseProcessorProperty(xmlDoc *doc, 
xmlNode *node, Processo
       } // for node
 }
 
+void FlowController::parsePort(xmlDoc *doc, xmlNode *processorNode, 
ProcessGroup *parent, TransferDirection direction)
+{
+       char *id = NULL;
+       char *name = NULL;
+       uuid_t uuid;
+       xmlNode *currentNode;
+       Processor *processor = NULL;
+       RemoteProcessorGroupPort *port = NULL;
+
+       if (!parent)
+       {
+               _logger->log_error("parseProcessNode: no parent group existed");
+               return;
+       }
+       // generate the random UIID
+       uuid_generate(uuid);
+
+       for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; 
currentNode = currentNode->next)
+       {
+               if (currentNode->type == XML_ELEMENT_NODE)
+               {
+                       if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0)
+                       {
+                               id = (char *) xmlNodeGetContent(currentNode);
+                               if (id)
+                               {
+                                       _logger->log_debug("parseProcessorNode: 
id => [%s]", id);
+                                       uuid_parse(id, uuid);
+                                       xmlFree(id);
+                               }
+                       }
+                       else if (xmlStrcmp(currentNode->name, BAD_CAST "name") 
== 0)
+                       {
+                               name = (char *) xmlNodeGetContent(currentNode);
+                               if (name)
+                               {
+                                       _logger->log_debug("parseProcessorNode: 
name => [%s]", name);
+                                       port = new 
RemoteProcessorGroupPort(name, uuid);
+                                       processor = (Processor *) port;
+                                       if (processor == NULL)
+                                       {
+                                               xmlFree(name);
+                                               return;
+                                       }
+                                       port->setDirection(direction);
+                                       port->setTimeOut(parent->getTimeOut());
+                                       
port->setTransmitting(parent->getTransmitting());
+                                       
processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
+                                       processor->initialize();
+                                       // add processor to parent
+                                       parent->addProcessor(processor);
+                                       xmlFree(name);
+                               }
+                       }
+                       else if (xmlStrcmp(currentNode->name, BAD_CAST 
"scheduledState") == 0)
+                       {
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               if (temp)
+                               {
+                                       std::string state = temp;
+                                       if (state == "DISABLED")
+                                       {
+                                               
_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", 
state.c_str());
+                                               
processor->setScheduledState(DISABLED);
+                                       }
+                                       if (state == "STOPPED")
+                                       {
+                                               
_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", 
state.c_str());
+                                               
processor->setScheduledState(STOPPED);
+                                       }
+                                       if (state == "RUNNING")
+                                       {
+                                               
_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", 
state.c_str());
+                                               
processor->setScheduledState(RUNNING);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       }
+                       else if (xmlStrcmp(currentNode->name, BAD_CAST 
"maxConcurrentTasks") == 0)
+                       {
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               if (temp)
+                               {
+                                       int64_t maxConcurrentTasks;
+                                       if (Property::StringToInt(temp, 
maxConcurrentTasks))
+                                       {
+                                               
_logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", 
maxConcurrentTasks);
+                                               
processor->setMaxConcurrentTasks(maxConcurrentTasks);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       }
+                       else if (xmlStrcmp(currentNode->name, BAD_CAST 
"property") == 0)
+                       {
+                               this->parseProcessorProperty(doc, currentNode, 
processor);
+                       }
+               } // if (currentNode->type == XML_ELEMENT_NODE)
+      } // while node
+}
+
 void FlowController::parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, 
ProcessGroup *parent)
 {
        char *id = NULL;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/GenerateFlowFile.cpp
----------------------------------------------------------------------
diff --git a/src/GenerateFlowFile.cpp b/src/GenerateFlowFile.cpp
index 16181de..4b0603d 100644
--- a/src/GenerateFlowFile.cpp
+++ b/src/GenerateFlowFile.cpp
@@ -115,6 +115,7 @@ void GenerateFlowFile::onTrigger(ProcessContext *context, 
ProcessSession *sessio
                        {
                                int randValue = random();
                                *((int *) current) = randValue;
+                               // *((int *) current) = (0xFFFFFFFF & i);
                                current += sizeof(int);
                        }
                }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/LogAttribute.cpp
----------------------------------------------------------------------
diff --git a/src/LogAttribute.cpp b/src/LogAttribute.cpp
index 67ed74e..82130f8 100644
--- a/src/LogAttribute.cpp
+++ b/src/LogAttribute.cpp
@@ -109,7 +109,7 @@ void LogAttribute::onTrigger(ProcessContext *context, 
ProcessSession *session)
        message << "\n" << "Payload:" << "\n";
        ReadCallback callback(flow->getSize());
        session->read(flow, &callback);
-       for (int i = 0, j = 0; i < callback._readSize; i++)
+       for (unsigned int i = 0, j = 0; i < callback._readSize; i++)
        {
                char temp[8];
                sprintf(temp, "%02x ", (unsigned char) (callback._buffer[i]));
@@ -147,9 +147,10 @@ void LogAttribute::onTrigger(ProcessContext *context, 
ProcessSession *session)
     }
 
     // Test Import
+    /*
     FlowFileRecord *importRecord = session->create();
     session->import(claim->getContentFullPath(), importRecord);
-    session->transfer(importRecord, Success);
+    session->transfer(importRecord, Success); */
 
 
     // Transfer to the relationship

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/src/ProcessGroup.cpp b/src/ProcessGroup.cpp
index a5fd773..d44cc99 100644
--- a/src/ProcessGroup.cpp
+++ b/src/ProcessGroup.cpp
@@ -40,6 +40,9 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string 
name, uuid_t uuid,
        else
                uuid_copy(_uuid, uuid);
 
+       _yieldPeriodMsec = 0;
+       _transmitting = false;
+
        _logger = Logger::getLogger();
        _logger->log_info("ProcessGroup %s created", _name.c_str());
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/src/ProcessSession.cpp b/src/ProcessSession.cpp
index 32d8920..2628ae3 100644
--- a/src/ProcessSession.cpp
+++ b/src/ProcessSession.cpp
@@ -118,8 +118,6 @@ FlowFileRecord* 
ProcessSession::cloneDuringTransfer(FlowFileRecord *parent)
                record->_size = parent->_size;
                record->_claim->increaseFlowFileRecordOwnedCount();
            }
-           // Copy connection
-           record->_connection = parent->_connection;
        }
 
        return record;
@@ -132,7 +130,7 @@ FlowFileRecord* ProcessSession::clone(FlowFileRecord 
*parent, long offset, long
        {
                if (parent->_claim)
                {
-                       if ((offset + size) > parent->_size)
+                       if ((offset + size) > (long) parent->_size)
                        {
                                // Set offset and size
                                _logger->log_error("clone offset %d and size %d 
exceed parent size %d",
@@ -486,7 +484,12 @@ void ProcessSession::commit()
                                                else
                                                {
                                                        // Clone the flow file 
and route to the connection
-                                                       
this->cloneDuringTransfer(record);
+                                                       FlowFileRecord 
*cloneRecord;
+                                                       cloneRecord = 
this->cloneDuringTransfer(record);
+                                                       if (cloneRecord)
+                                                               
cloneRecord->_connection = connection;
+                                                       else
+                                                               throw 
Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
                                                }
                                        }
                                }
@@ -540,7 +543,12 @@ void ProcessSession::commit()
                                                else
                                                {
                                                        // Clone the flow file 
and route to the connection
-                                                       
this->cloneDuringTransfer(record);
+                                                       FlowFileRecord 
*cloneRecord;
+                                                       cloneRecord = 
this->cloneDuringTransfer(record);
+                                                       if (cloneRecord)
+                                                               
cloneRecord->_connection = connection;
+                                                       else
+                                                               throw 
Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
                                                }
                                        }
                                }
@@ -627,7 +635,10 @@ void ProcessSession::rollback()
                {
                        FlowFileRecord *record = it->second;
                        if (record->_orginalConnection)
+                       {
+                               record->_snapshot = false;
                                record->_orginalConnection->put(record);
+                       }
                        else
                                delete record;
                }
@@ -685,6 +696,7 @@ FlowFileRecord *ProcessSession::get()
                if (ret)
                {
                        // add the flow record to the current process session 
update map
+                       ret->_markedDelete = false;
                        _updatedFlowFiles[ret->getUUIDStr()] = ret;
                        std::map<std::string, std::string> empty;
                        FlowFileRecord *snapshot = new FlowFileRecord(empty);

Reply via email to