MINIFI-6: Add Site2Site Adjust README
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/e170f7aa Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/e170f7aa Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/e170f7aa Branch: refs/heads/master Commit: e170f7aa47919ac8deb3a9df5b6e98f0e15754cc Parents: 7956696 Author: Bin Qiu <[email protected]> Authored: Thu Jul 14 09:13:34 2016 -0700 Committer: Aldrin Piri <[email protected]> Committed: Tue Aug 2 11:37:42 2016 -0400 ---------------------------------------------------------------------- Makefile | 4 +- README.md | 56 +- conf/flow.xml | 215 ++++- conf/flowServer.xml | 130 +++ conf/flow_Site2SiteServer.xml | 140 +++ conf/nifi.properties | 6 + inc/Exception.h | 2 + inc/FlowControlProtocol.h | 5 + inc/FlowController.h | 11 + inc/ProcessGroup.h | 42 + inc/RealTimeDataCollector.h | 131 +++ inc/RemoteProcessorGroupPort.h | 96 +++ inc/Site2SiteClientProtocol.h | 633 ++++++++++++++ inc/Site2SitePeer.h | 359 ++++++++ inc/TimeUtil.h | 2 +- main/MiNiFiMain.cpp | 2 +- src/FlowControlProtocol.cpp | 2 + src/FlowController.cpp | 223 ++++- src/GenerateFlowFile.cpp | 1 + src/LogAttribute.cpp | 5 +- src/ProcessGroup.cpp | 3 + src/ProcessSession.cpp | 22 +- src/RealTimeDataCollector.cpp | 482 +++++++++++ src/RemoteProcessorGroupPort.cpp | 99 +++ src/Site2SiteClientProtocol.cpp | 1313 +++++++++++++++++++++++++++++ src/Site2SitePeer.cpp | 434 ++++++++++ target/conf/flow.xml | 131 ++- target/conf/flowServer.xml | 130 +++ target/conf/flow_Site2SiteServer.xml | 140 +++ target/conf/nifi.properties | 2 +- thirdparty/uuid/tst_uuid | Bin 191032 -> 29660 bytes 31 files changed, 4753 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/Makefile ---------------------------------------------------------------------- diff --git a/Makefile b/Makefile index 3128422..963f473 100644 --- a/Makefile +++ b/Makefile @@ -28,7 +28,7 @@ LDDIRECTORY=-L./build -L./thirdparty/uuid -L./thirdparty/libxml2/.libs/ #-L/usr/ #LDFLAGS=-lminifi -lxml2 -lleveldb -pthread -luuid LDFLAGS=-static -lminifi -lxml2 -pthread -luuid else ifeq ($(ARCH), linux) -CFLAGS=-O0 -fexceptions -fpermissive -Wno-write-strings -std=c++11 -fPIC -Wall -g -Wno-unused-private-field +CFLAGS=-O0 -fexceptions -fpermissive -Wno-write-strings -std=c++11 -fPIC -Wall -g INCLUDES=-I./inc -I./src -I./thirdparty -I./test -I./thirdparty/libxml2/include #-I/usr/local/opt/leveldb/include/ LDDIRECTORY=-L./build -L./thirdparty/uuid -L./thirdparty/libxml2/.libs/ #-L/usr/local/opt/leveldb/lib #LDFLAGS=-lminifi -lxml2 -lleveldb -pthread -luuid @@ -37,7 +37,7 @@ else CFLAGS=-O0 -fexceptions -fpermissive -Wno-write-strings -std=c++11 -fPIC -Wall -g -Wno-unused-private-field INCLUDES=-I./inc -I./src -I./test -I/usr/include/libxml2 #-I/usr/local/opt/leveldb/include/ LDDIRECTORY=-L./build #-L/usr/local/opt/leveldb/out-static/ -LDFLAGS=-lminifi -lxml2 -pthread -luuid#--llevedb +LDFLAGS=-lminifi -lxml2 -pthread -luuid #--llevedb endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index d575cb1..6c4137e 100644 --- a/README.md +++ b/README.md @@ -37,8 +37,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ## Dependencies - * [LevelDB](https://github.com/google/leveldb) - tested with v1.18 - MAC: brew install leveldb * gcc - 4.8.4 * g++ - 4.8.4 * [libxml2](http://xmlsoft.org/) - tested with 2.9.1 @@ -48,26 +46,58 @@ limitations under the License. ## Build instructions -Build application +Build application, it will build minifi exe under build and copy over to target directory - $ make clean $ make -Build tests - - $ make tests - Clean $ make clean - ## Running Running application -The nifi flow.xml and nifi.properties are in target/conf - $ ./target/minifi -Runnning tests + $ ./target/minifi - $ ./build/FlowFileRecordTest +The Native MiNiFi example flow.xml is in target/conf +It show cases a Native MiNiFi client which can generate flowfile, log flowfile and push it to the NiFi server. +Also it can pull flowfile from NiFi server and log the flowfile. +The NiFi server config is target/conf/flow_Site2SiteServer.xml + +For trial command control protocol between Native MiNiFi and NiFi Server, please see the example NiFi Server implementation in test/Server.cpp +The command control protocol is not finalized yet. + +Caveat: +1) +Add new propery HostName and Port into RemoteProcessGroup InputOutput port for remote Site2Site hostname and port +<remoteProcessGroup> + <id>8f3b248f-d493-4269-b317-36f85719f480</id> + <name>NiFi Flow</name> + <url>http://localhost:8081/nifi</url> + <timeout>30 sec</timeout> + <yieldPeriod>1 sec</yieldPeriod> + <transmitting>true</transmitting> + <inputPort> + <id>471deef6-2a6e-4a7d-912a-81cc17e3a204</id> + <name> From Node A</name> + <position x="0.0" y="0.0"/> + <comments/> + <scheduledState>RUNNING</scheduledState> + <maxConcurrentTasks>1</maxConcurrentTasks> + <useCompression>false</useCompression> + <property> + <name>Host Name</name> + <value>localhost</value> + </property> + <property> + <name>Port</name> + <value>10001</value> + </property> + </inputPort> +2) +Add new proerties into nifi.properties for command control +# MiNiFi Server for Command Control +nifi.server.name=localhost +nifi.server.port=9000 +nifi.server.report.interval=1000 ms http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/conf/flow.xml ---------------------------------------------------------------------- diff --git a/conf/flow.xml b/conf/flow.xml index a29ea1f..51b74e8 100644 --- a/conf/flow.xml +++ b/conf/flow.xml @@ -8,14 +8,14 @@ <position x="0.0" y="0.0"/> <comment/> <processor> - <id>572aa3f3-6288-4ca1-ae43-5e492cb0ea23</id> - <name>RealTimeDataCollector</name> - <position x="3259.732177734375" y="1739.991943359375"/> + <id>e01275ae-ac38-48f9-ac53-1a44df1be88e</id> + <name>LogAttribute</name> + <position x="3950.0958625440016" y="1355.8949219185629"/> <styles/> <comment/> - <class>org.apache.nifi.processors.standard.RealTimeDataCollector</class> - <maxConcurrentTasks>2</maxConcurrentTasks> - <schedulingPeriod>1 ms</schedulingPeriod> + <class>org.apache.nifi.processors.standard.LogAttribute</class> + <maxConcurrentTasks>1</maxConcurrentTasks> + <schedulingPeriod>0 sec</schedulingPeriod> <penalizationPeriod>30 sec</penalizationPeriod> <yieldPeriod>1 sec</yieldPeriod> <bulletinLevel>WARN</bulletinLevel> @@ -24,51 +24,206 @@ <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> <runDurationNanos>0</runDurationNanos> <property> - <name>File Name</name> - <value>data.osp></value> + <name>Log Level</name> + <value>info</value> </property> <property> - <name>Real Time Server Name</name> - <value>localhost</value> + <name>Log Payload</name> + <value>false</value> </property> <property> - <name>Real Time Server Port</name> - <value>10000</value> + <name>Attributes to Log</name> </property> <property> - <name>Batch Server Name</name> - <value>localhost</value> + <name>Attributes to Ignore</name> </property> <property> - <name>Batch Server Port</name> - <value>10001</value> + <name>Log prefix</name> </property> + <autoTerminatedRelationship>success</autoTerminatedRelationship> + </processor> + <processor> + <id>572aa3f3-6288-4ca1-ae43-5e492cb0ea23</id> + <name>LogAttribute</name> + <position x="3259.732177734375" y="1739.991943359375"/> + <styles/> + <comment/> + <class>org.apache.nifi.processors.standard.LogAttribute</class> + <maxConcurrentTasks>1</maxConcurrentTasks> + <schedulingPeriod>0 sec</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> + <lossTolerant>false</lossTolerant> + <scheduledState>RUNNING</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <runDurationNanos>0</runDurationNanos> <property> - <name>Iteration</name> - <value>true</value> + <name>Log Level</name> + <value>info</value> </property> <property> - <name>Real Time Message ID</name> - <value>41</value> + <name>Log Payload</name> + <value>false</value> </property> <property> - <name>Batch Message ID</name> - <value>172, 30, 48</value> + <name>Attributes to Log</name> </property> <property> - <name>Real Time Interval</name> - <value>10 ms</value> + <name>Attributes to Ignore</name> </property> <property> - <name>Batch Time Interval</name> - <value>100 ms</value> + <name>Log prefix</name> </property> + <autoTerminatedRelationship>success</autoTerminatedRelationship> + </processor> + <processor> + <id>a0e57bb2-5b89-438e-8869-0326bbdbbe43</id> + <name>GenerateFlowFile</name> + <position x="2643.1135987796815" y="1457.4419966791334"/> + <styles/> + <comment/> + <class>org.apache.nifi.processors.standard.GenerateFlowFile</class> + <maxConcurrentTasks>1</maxConcurrentTasks> + <schedulingPeriod>1 s</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> + <lossTolerant>false</lossTolerant> + <scheduledState>RUNNING</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <runDurationNanos>0</runDurationNanos> <property> - <name>Batch Max Buffer Size</name> - <value>1048576</value> + <name>File Size</name> + <value>1024 kB</value> + </property> + <property> + <name>Batch Size</name> + <value>1</value> + </property> + <property> + <name>Data Format</name> + <value>Text</value> + </property> + <property> + <name>Unique FlowFiles</name> + <value>false</value> </property> - <autoTerminatedRelationship>success</autoTerminatedRelationship> </processor> + <label> + <id>809d63d9-6feb-496a-9dc3-d23c217e52fd</id> + <position x="3635.581271381991" y="1309.9918825902428"/> + <size height="193.5023651123047" width="641.0671997070312"/> + <styles> + <style name="background-color">#9a91ff</style> + <style name="font-size">16px</style> + </styles> + <value>Pull From Node B</value> + </label> + <label> + <id>d95ce8d3-c005-4d0b-8fcc-b2f6fae7172f</id> + <position x="2601.7320892530847" y="1413.1875613011803"/> + <size height="193.5023651123047" width="641.0671997070312"/> + <styles> + <style name="font-size">16px</style> + </styles> + <value>Push to Node B</value> + </label> + <remoteProcessGroup> + <id>8f3b248f-d493-4269-b317-36f85719f480</id> + <name>NiFi Flow</name> + <position x="3254.3356850982673" y="1432.3274284388426"/> + <comment/> + <url>http://localhost:8081/nifi</url> + <timeout>30 sec</timeout> + <yieldPeriod>1 sec</yieldPeriod> + <transmitting>true</transmitting> + <inputPort> + <id>471deef6-2a6e-4a7d-912a-81cc17e3a204</id> + <name> From Node A</name> + <position x="0.0" y="0.0"/> + <comments/> + <scheduledState>RUNNING</scheduledState> + <maxConcurrentTasks>1</maxConcurrentTasks> + <useCompression>false</useCompression> + <property> + <name>Host Name</name> + <value>localhost</value> + </property> + <property> + <name>Port</name> + <value>10001</value> + </property> + </inputPort> + <outputPort> + <id>75f88005-0a87-4fef-8320-6219cdbcf18b</id> + <name>To A</name> + <position x="0.0" y="0.0"/> + <comments/> + <scheduledState>RUNNING</scheduledState> + <maxConcurrentTasks>1</maxConcurrentTasks> + <useCompression>false</useCompression> + <property> + <name>Host Name</name> + <value>localhost</value> + </property> + <property> + <name>Port</name> + <value>10001</value> + </property> + </outputPort> + </remoteProcessGroup> + <connection> + <id>c4cf70d8-be05-4c3d-b926-465f330d6503</id> + <name/> + <bendPoints/> + <labelIndex>1</labelIndex> + <zIndex>0</zIndex> + <sourceId>a0e57bb2-5b89-438e-8869-0326bbdbbe43</sourceId> + <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId> + <sourceType>PROCESSOR</sourceType> + <destinationId>572aa3f3-6288-4ca1-ae43-5e492cb0ea23</destinationId> + <destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId> + <destinationType>PROCESSOR</destinationType> + <relationship>success</relationship> + <maxWorkQueueSize>0</maxWorkQueueSize> + <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize> + <flowFileExpiration>0 sec</flowFileExpiration> + </connection> + <connection> + <id>c9573abe-937c-464b-b18d-48b29c42dce2</id> + <name>site2siteSEND</name> + <bendPoints/> + <labelIndex>1</labelIndex> + <zIndex>0</zIndex> + <sourceId>a0e57bb2-5b89-438e-8869-0326bbdbbe43</sourceId> + <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId> + <sourceType>PROCESSOR</sourceType> + <destinationId>471deef6-2a6e-4a7d-912a-81cc17e3a204</destinationId> + <destinationGroupId>8f3b248f-d493-4269-b317-36f85719f480</destinationGroupId> + <destinationType>REMOTE_INPUT_PORT</destinationType> + <relationship>success</relationship> + <maxWorkQueueSize>0</maxWorkQueueSize> + <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize> + <flowFileExpiration>0 sec</flowFileExpiration> + </connection> + <connection> + <id>2cb90b4c-d6cb-4fef-8f0f-b16459561af5</id> + <name>site2siteReceive</name> + <bendPoints/> + <labelIndex>1</labelIndex> + <zIndex>0</zIndex> + <sourceId>75f88005-0a87-4fef-8320-6219cdbcf18b</sourceId> + <sourceGroupId>8f3b248f-d493-4269-b317-36f85719f480</sourceGroupId> + <sourceType>REMOTE_OUTPUT_PORT</sourceType> + <destinationId>e01275ae-ac38-48f9-ac53-1a44df1be88e</destinationId> + <destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId> + <destinationType>PROCESSOR</destinationType> + <relationship/> + <maxWorkQueueSize>0</maxWorkQueueSize> + <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize> + <flowFileExpiration>0 sec</flowFileExpiration> + </connection> </rootGroup> <controllerServices> <controllerService> @@ -114,7 +269,7 @@ </property> <property> <name>Truststore Password</name> - <value>enc{9E2EE146023A0F31914706460EB177B357796CF0C768DECE09D10C4B40F344C8}</value> + <value>enc{3A31531B76B6395A72FB8BEB4C93E2040877D07C04FDAB5A84499B918BECEB77}</value> </property> <property> <name>Truststore Type</name> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/conf/flowServer.xml ---------------------------------------------------------------------- diff --git a/conf/flowServer.xml b/conf/flowServer.xml new file mode 100644 index 0000000..caca3eb --- /dev/null +++ b/conf/flowServer.xml @@ -0,0 +1,130 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<flowController> + <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount> + <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount> + <rootGroup> + <id>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</id> + <name>NiFi Flow</name> + <position x="0.0" y="0.0"/> + <comment/> + <processor> + <id>572aa3f3-6288-4ca1-ae43-5e492cb0ea23</id> + <name>RealTimeDataCollector</name> + <position x="3259.732177734375" y="1739.991943359375"/> + <styles/> + <comment/> + <class>org.apache.nifi.processors.standard.RealTimeDataCollector</class> + <maxConcurrentTasks>2</maxConcurrentTasks> + <schedulingPeriod>10 ms</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> + <lossTolerant>false</lossTolerant> + <scheduledState>RUNNING</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <runDurationNanos>0</runDurationNanos> + <property> + <name>File Name</name> + <value>data.osp</value> + </property> + <property> + <name>Real Time Server Name</name> + <value>localhost</value> + </property> + <property> + <name>Real Time Server Port</name> + <value>10000</value> + </property> + <property> + <name>Batch Server Name</name> + <value>localhost</value> + </property> + <property> + <name>Batch Server Port</name> + <value>10001</value> + </property> + <property> + <name>Iteration</name> + <value>true</value> + </property> + <property> + <name>Real Time Message ID</name> + <value>41</value> + </property> + <property> + <name>Batch Message ID</name> + <value>172,48</value> + </property> + <property> + <name>Real Time Interval</name> + <value>200 ms</value> + </property> + <property> + <name>Batch Time Interval</name> + <value>1 sec</value> + </property> + <property> + <name>Batch Max Buffer Size</name> + <value>262144</value> + </property> + <autoTerminatedRelationship>success</autoTerminatedRelationship> + </processor> + </rootGroup> + <controllerServices> + <controllerService> + <id>b2785fb0-e797-4c4d-8592-d2b2563504c4</id> + <name>DistributedMapCacheClientService</name> + <comment/> + <class>org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService</class> + <enabled>true</enabled> + <property> + <name>Server Hostname</name> + <value>localhost</value> + </property> + <property> + <name>Server Port</name> + <value>4557</value> + </property> + <property> + <name>SSL Context Service</name> + </property> + <property> + <name>Communications Timeout</name> + <value>30 secs</value> + </property> + </controllerService> + <controllerService> + <id>2855f1e0-dc35-4955-9ae2-b2d7d1765d4e</id> + <name>StandardSSLContextService</name> + <comment/> + <class>org.apache.nifi.ssl.StandardSSLContextService</class> + <enabled>true</enabled> + <property> + <name>Keystore Filename</name> + </property> + <property> + <name>Keystore Password</name> + </property> + <property> + <name>Keystore Type</name> + </property> + <property> + <name>Truststore Filename</name> + <value>/Library/Java/JavaVirtualMachines/jdk1.8.0_73.jdk/Contents/Home/jre/lib/security/cacerts</value> + </property> + <property> + <name>Truststore Password</name> + <value>enc{9E2EE146023A0F31914706460EB177B357796CF0C768DECE09D10C4B40F344C8}</value> + </property> + <property> + <name>Truststore Type</name> + <value>JKS</value> + </property> + <property> + <name>SSL Protocol</name> + <value>TLS</value> + </property> + </controllerService> + </controllerServices> + <reportingTasks/> +</flowController> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/conf/flow_Site2SiteServer.xml ---------------------------------------------------------------------- diff --git a/conf/flow_Site2SiteServer.xml b/conf/flow_Site2SiteServer.xml new file mode 100644 index 0000000..acd2c1e --- /dev/null +++ b/conf/flow_Site2SiteServer.xml @@ -0,0 +1,140 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<flowController> + <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount> + <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount> + <rootGroup> + <id>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</id> + <name>NiFi Flow</name> + <position x="0.0" y="0.0"/> + <comment/> + <processor> + <id>cd274fef-168a-486b-b21a-04ed17f981b7</id> + <name>LogAttribute</name> + <position x="2823.8107761867964" y="623.2524160253959"/> + <styles/> + <comment/> + <class>org.apache.nifi.processors.standard.LogAttribute</class> + <maxConcurrentTasks>1</maxConcurrentTasks> + <schedulingPeriod>0 sec</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> + <lossTolerant>false</lossTolerant> + <scheduledState>RUNNING</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <runDurationNanos>0</runDurationNanos> + <property> + <name>Log Level</name> + <value>info</value> + </property> + <property> + <name>Log Payload</name> + <value>true</value> + </property> + <property> + <name>Attributes to Log</name> + </property> + <property> + <name>Attributes to Ignore</name> + </property> + <property> + <name>Log prefix</name> + </property> + <autoTerminatedRelationship>success</autoTerminatedRelationship> + </processor> + <processor> + <id>4fa35a7d-d1f0-44e4-87d7-7d69f0b78b7b</id> + <name>GenerateFlowFile</name> + <position x="2248.4411151522036" y="917.8589272756209"/> + <styles/> + <comment/> + <class>org.apache.nifi.processors.standard.GenerateFlowFile</class> + <maxConcurrentTasks>1</maxConcurrentTasks> + <schedulingPeriod>1 sec</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> + <lossTolerant>false</lossTolerant> + <scheduledState>RUNNING</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <runDurationNanos>0</runDurationNanos> + <property> + <name>File Size</name> + <value>1024 kB</value> + </property> + <property> + <name>Batch Size</name> + <value>1</value> + </property> + <property> + <name>Data Format</name> + <value>Text</value> + </property> + <property> + <name>Unique FlowFiles</name> + <value>false</value> + </property> + </processor> + <inputPort> + <id>471deef6-2a6e-4a7d-912a-81cc17e3a204</id> + <name> From Node A</name> + <position x="2305.369919163486" y="646.0466623031645"/> + <comments/> + <scheduledState>RUNNING</scheduledState> + <maxConcurrentTasks>1</maxConcurrentTasks> + </inputPort> + <outputPort> + <id>75f88005-0a87-4fef-8320-6219cdbcf18b</id> + <name>To A</name> + <position x="2915.739181824911" y="1057.8803860295386"/> + <comments/> + <scheduledState>RUNNING</scheduledState> + <maxConcurrentTasks>1</maxConcurrentTasks> + </outputPort> + <label> + <id>2f0db43e-1ce0-49ab-96a5-459c285aff09</id> + <position x="2197.3693058093504" y="849.4395700448451"/> + <size height="286.5726013183594" width="1012.2957763671875"/> + <styles> + <style name="font-size">18px</style> + </styles> + <value>Generate Data that is pushed to Node A and made available to be pulled</value> + </label> + <connection> + <id>7f869898-3a93-4e28-a60c-064789870574</id> + <name/> + <bendPoints/> + <labelIndex>1</labelIndex> + <zIndex>0</zIndex> + <sourceId>471deef6-2a6e-4a7d-912a-81cc17e3a204</sourceId> + <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId> + <sourceType>INPUT_PORT</sourceType> + <destinationId>cd274fef-168a-486b-b21a-04ed17f981b7</destinationId> + <destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId> + <destinationType>PROCESSOR</destinationType> + <relationship/> + <maxWorkQueueSize>0</maxWorkQueueSize> + <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize> + <flowFileExpiration>0 sec</flowFileExpiration> + </connection> + <connection> + <id>9dbc73f6-c827-4258-8bc7-06eb6a9b79d5</id> + <name/> + <bendPoints/> + <labelIndex>1</labelIndex> + <zIndex>0</zIndex> + <sourceId>4fa35a7d-d1f0-44e4-87d7-7d69f0b78b7b</sourceId> + <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId> + <sourceType>PROCESSOR</sourceType> + <destinationId>75f88005-0a87-4fef-8320-6219cdbcf18b</destinationId> + <destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId> + <destinationType>OUTPUT_PORT</destinationType> + <relationship>success</relationship> + <maxWorkQueueSize>0</maxWorkQueueSize> + <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize> + <flowFileExpiration>0 sec</flowFileExpiration> + </connection> + </rootGroup> + <controllerServices/> + <reportingTasks/> +</flowController> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/conf/nifi.properties ---------------------------------------------------------------------- diff --git a/conf/nifi.properties b/conf/nifi.properties index c4f7dff..627876f 100644 --- a/conf/nifi.properties +++ b/conf/nifi.properties @@ -183,3 +183,9 @@ nifi.cluster.manager.safemode.duration=0 sec # kerberos # nifi.kerberos.krb5.file= + +# MiNiFi Server for Command Control +nifi.server.name=localhost +nifi.server.port=9000 +nifi.server.report.interval=1000 ms + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/inc/Exception.h ---------------------------------------------------------------------- diff --git a/inc/Exception.h b/inc/Exception.h index c107123..d321454 100644 --- a/inc/Exception.h +++ b/inc/Exception.h @@ -34,6 +34,7 @@ enum ExceptionType PROCESSOR_EXCEPTION, PROCESS_SESSION_EXCEPTION, PROCESS_SCHEDULE_EXCEPTION, + SITE2SITE_EXCEPTION, GENERAL_EXCEPTION, MAX_EXCEPTION }; @@ -46,6 +47,7 @@ static const char *ExceptionStr[MAX_EXCEPTION] = "Processor Operation", "Process Session Operation", "Process Schedule Operation", + "Site2Site Protocol", "General Operation" }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/inc/FlowControlProtocol.h ---------------------------------------------------------------------- diff --git a/inc/FlowControlProtocol.h b/inc/FlowControlProtocol.h index ae31ce5..24416f2 100644 --- a/inc/FlowControlProtocol.h +++ b/inc/FlowControlProtocol.h @@ -248,6 +248,11 @@ public: } //! Run function for the thread static void run(FlowControlProtocol *protocol); + //! set 8 bytes SerialNumber + void setSerialNumber(uint8_t *number) + { + memcpy(_serialNumber, number, 8); + } protected: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/inc/FlowController.h ---------------------------------------------------------------------- diff --git a/inc/FlowController.h b/inc/FlowController.h index 4fa5810..3895096 100644 --- a/inc/FlowController.h +++ b/inc/FlowController.h @@ -42,8 +42,10 @@ #include "ProcessGroup.h" #include "GenerateFlowFile.h" #include "LogAttribute.h" +#include "RealTimeDataCollector.h" #include "TimerDrivenSchedulingAgent.h" #include "FlowControlProtocol.h" +#include "RemoteProcessorGroupPort.h" //! Default NiFi Root Group Name #define DEFAULT_ROOT_GROUP_NAME "" @@ -139,6 +141,11 @@ public: ProcessGroup *createRemoteProcessGroup(std::string name, uuid_t uuid); //! Create Connection Connection *createConnection(std::string name, uuid_t uuid); + //! set 8 bytes SerialNumber + void setSerialNumber(uint8_t *number) + { + _protocol->setSerialNumber(number); + } protected: @@ -183,12 +190,16 @@ private: std::atomic<bool> _initialized; //! Process Processor Node XML void parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent); + //! Process Port XML + void parsePort(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent, TransferDirection direction); //! Process Root Processor Group XML void parseRootProcessGroup(xmlDoc *doc, xmlNode *node); //! Process Property XML void parseProcessorProperty(xmlDoc *doc, xmlNode *node, Processor *processor); //! Process connection XML void parseConnection(xmlDoc *doc, xmlNode *node, ProcessGroup *parent); + //! Process Remote Process Group + void parseRemoteProcessGroup(xmlDoc *doc, xmlNode *node, ProcessGroup *parent); // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer FlowController(const FlowController &parent); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/inc/ProcessGroup.h ---------------------------------------------------------------------- diff --git a/inc/ProcessGroup.h b/inc/ProcessGroup.h index 796142f..be69f2d 100644 --- a/inc/ProcessGroup.h +++ b/inc/ProcessGroup.h @@ -61,6 +61,41 @@ public: std::string getName(void) { return (_name); } + //! Set URL + void setURL(std::string url) { + _url = url; + } + //! Get URL + std::string getURL(void) { + return (_url); + } + //! SetTransmitting + void setTransmitting(bool val) + { + _transmitting = val; + } + //! Get Transmitting + bool getTransmitting() + { + return _transmitting; + } + //! setTimeOut + void setTimeOut(uint64_t time) + { + _timeOut = time; + } + uint64_t getTimeOut() + { + return _timeOut; + } + //! Set Processor yield period in MilliSecond + void setYieldPeriodMsec(uint64_t period) { + _yieldPeriodMsec = period; + } + //! Get Processor yield period in MilliSecond + uint64_t getYieldPeriodMsec(void) { + return(_yieldPeriodMsec); + } //! Set UUID void setUUID(uuid_t uuid) { uuid_copy(_uuid, uuid); @@ -122,6 +157,13 @@ protected: std::set<Connection *> _connections; //! Parent Process Group ProcessGroup* _parentProcessGroup; + //! Yield Period in Milliseconds + std::atomic<uint64_t> _yieldPeriodMsec; + std::atomic<uint64_t> _timeOut; + //! URL + std::string _url; + //! Transmitting + std::atomic<bool> _transmitting; private: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/inc/RealTimeDataCollector.h ---------------------------------------------------------------------- diff --git a/inc/RealTimeDataCollector.h b/inc/RealTimeDataCollector.h new file mode 100644 index 0000000..760b566 --- /dev/null +++ b/inc/RealTimeDataCollector.h @@ -0,0 +1,131 @@ +/** + * @file RealTimeDataCollector.h + * RealTimeDataCollector class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __REAL_TIME_DATA_COLLECTOR_H__ +#define __REAL_TIME_DATA_COLLECTOR_H__ + +#include <stdio.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <fcntl.h> +#include <netdb.h> +#include <string> +#include <errno.h> +#include "FlowFileRecord.h" +#include "Processor.h" +#include "ProcessSession.h" + +//! RealTimeDataCollector Class +class RealTimeDataCollector : public Processor +{ +public: + //! Constructor + /*! + * Create a new processor + */ + RealTimeDataCollector(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid) + { + _realTimeSocket = 0; + _batchSocket = 0; + _logger = Logger::getLogger(); + _firstInvoking = false; + _realTimeAccumulated = 0; + _batchAcccumulated = 0; + _queuedDataSize = 0; + } + //! Destructor + virtual ~RealTimeDataCollector() + { + if (_realTimeSocket) + close(_realTimeSocket); + if (_batchSocket) + close(_batchSocket); + if (_fileStream.is_open()) + _fileStream.close(); + } + //! Processor Name + static const std::string ProcessorName; + //! Supported Properties + static Property REALTIMESERVERNAME; + static Property REALTIMESERVERPORT; + static Property BATCHSERVERNAME; + static Property BATCHSERVERPORT; + static Property FILENAME; + static Property ITERATION; + static Property REALTIMEMSGID; + static Property BATCHMSGID; + static Property REALTIMEINTERVAL; + static Property BATCHINTERVAL; + static Property BATCHMAXBUFFERSIZE; + //! Supported Relationships + static Relationship Success; + //! Connect to the socket + int connectServer(const char *host, uint16_t port); + int sendData(int socket, const char *buf, int buflen); + void onTriggerRealTime(ProcessContext *context, ProcessSession *session); + void onTriggerBatch(ProcessContext *context, ProcessSession *session); + +public: + //! OnTrigger method, implemented by NiFi RealTimeDataCollector + virtual void onTrigger(ProcessContext *context, ProcessSession *session); + //! Initialize, over write by NiFi RealTimeDataCollector + virtual void initialize(void); + +protected: + +private: + //! realtime server Name + std::string _realTimeServerName; + int64_t _realTimeServerPort; + std::string _batchServerName; + int64_t _batchServerPort; + int64_t _realTimeInterval; + int64_t _batchInterval; + int64_t _batchMaxBufferSize; + //! Match pattern for Real time Message ID + std::vector<std::string> _realTimeMsgID; + //! Match pattern for Batch Message ID + std::vector<std::string> _batchMsgID; + //! file for which the realTime collector will tail + std::string _fileName; + //! Whether we need to iterate from the beginning for demo + bool _iteration; + int _realTimeSocket; + int _batchSocket; + //! Logger + Logger *_logger; + //! Mutex for protection + std::mutex _mtx; + //! Queued data size + uint64_t _queuedDataSize; + //! Queue for the batch process + std::queue<std::string> _queue; + std::thread::id _realTimeThreadId; + std::thread::id _batchThreadId; + std::atomic<bool> _firstInvoking; + int64_t _realTimeAccumulated; + int64_t _batchAcccumulated; + std::ifstream _fileStream; +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/inc/RemoteProcessorGroupPort.h ---------------------------------------------------------------------- diff --git a/inc/RemoteProcessorGroupPort.h b/inc/RemoteProcessorGroupPort.h new file mode 100644 index 0000000..cd99e50 --- /dev/null +++ b/inc/RemoteProcessorGroupPort.h @@ -0,0 +1,96 @@ +/** + * @file RemoteProcessorGroupPort.h + * RemoteProcessorGroupPort class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __REMOTE_PROCESSOR_GROUP_PORT_H__ +#define __REMOTE_PROCESSOR_GROUP_PORT_H__ + +#include "FlowFileRecord.h" +#include "Processor.h" +#include "ProcessSession.h" +#include "Site2SiteClientProtocol.h" + +//! RemoteProcessorGroupPort Class +class RemoteProcessorGroupPort : public Processor +{ +public: + //! Constructor + /*! + * Create a new processor + */ + RemoteProcessorGroupPort(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid) + { + _logger = Logger::getLogger(); + _peer = new Site2SitePeer("", 9999); + _protocol = new Site2SiteClientProtocol(_peer); + _protocol->setPortId(uuid); + } + //! Destructor + virtual ~RemoteProcessorGroupPort() + { + delete _protocol; + delete _peer; + } + //! Processor Name + static const std::string ProcessorName; + //! Supported Properties + static Property hostName; + static Property port; + //! Supported Relationships + static Relationship relation; +public: + //! OnTrigger method, implemented by NiFi RemoteProcessorGroupPort + virtual void onTrigger(ProcessContext *context, ProcessSession *session); + //! Initialize, over write by NiFi RemoteProcessorGroupPort + virtual void initialize(void); + //! Set Direction + void setDirection(TransferDirection direction) + { + _direction = direction; + if (_direction == RECEIVE) + this->setTriggerWhenEmpty(true); + } + //! Set Timeout + void setTimeOut(uint64_t timeout) + { + _protocol->setTimeOut(timeout); + } + //! SetTransmitting + void setTransmitting(bool val) + { + _transmitting = val; + } + +protected: + +private: + //! Logger + Logger *_logger; + //! Peer Connection + Site2SitePeer *_peer; + //! Peer Protocol + Site2SiteClientProtocol *_protocol; + //! Transaction Direction + TransferDirection _direction; + //! Transmitting + bool _transmitting; + +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/inc/Site2SiteClientProtocol.h ---------------------------------------------------------------------- diff --git a/inc/Site2SiteClientProtocol.h b/inc/Site2SiteClientProtocol.h new file mode 100644 index 0000000..2a517d7 --- /dev/null +++ b/inc/Site2SiteClientProtocol.h @@ -0,0 +1,633 @@ +/** + * @file Site2SiteClientProtocol.h + * Site2SiteClientProtocol class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __SITE2SITE_CLIENT_PROTOCOL_H__ +#define __SITE2SITE_CLIENT_PROTOCOL_H__ + +#include <stdio.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <fcntl.h> +#include <netdb.h> +#include <string> +#include <errno.h> +#include <chrono> +#include <thread> +#include <algorithm> +#include <uuid/uuid.h> +#include "Logger.h" +#include "Configure.h" +#include "Property.h" +#include "Site2SitePeer.h" +#include "FlowFileRecord.h" +#include "ProcessContext.h" +#include "ProcessSession.h" + +//! Resource Negotiated Status Code +#define RESOURCE_OK 20 +#define DIFFERENT_RESOURCE_VERSION 21 +#define NEGOTIATED_ABORT 255 +// ! Max attributes +#define MAX_NUM_ATTRIBUTES 25000 + +/** + * An enumeration for specifying the direction in which data should be + * transferred between a client and a remote NiFi instance. + */ +typedef enum { + /** + * * The client is to send data to the remote instance. + * */ + SEND, + /** + * * The client is to receive data from the remote instance. + * */ + RECEIVE +} TransferDirection; + + +//! Peer State +typedef enum { + /** + * * IDLE + * */ + IDLE = 0, + /** + * * Socket Established + * */ + ESTABLISHED, + /** + * * HandShake Done + * */ + HANDSHAKED, + /** + * * After CodeDec Completion + * */ + READY +} PeerState; + +//! Transaction State +typedef enum { + /** + * * Transaction has been started but no data has been sent or received. + * */ + TRANSACTION_STARTED, + /** + * * Transaction has been started and data has been sent or received. + * */ + DATA_EXCHANGED, + /** + * * Data that has been transferred has been confirmed via its CRC. + * * Transaction is ready to be completed. + * */ + TRANSACTION_CONFIRMED, + /** + * * Transaction has been successfully completed. + * */ + TRANSACTION_COMPLETED, + /** + * * The Transaction has been canceled. + * */ + TRANSACTION_CANCELED, + /** + * * The Transaction ended in an error. + * */ + TRANSACTION_ERROR +} TransactionState; + +//! Request Type +typedef enum { + NEGOTIATE_FLOWFILE_CODEC = 0, + REQUEST_PEER_LIST, + SEND_FLOWFILES, + RECEIVE_FLOWFILES, + SHUTDOWN, + MAX_REQUEST_TYPE +} RequestType; + +//! Request Type Str +static const char *RequestTypeStr[MAX_REQUEST_TYPE] = +{ + "NEGOTIATE_FLOWFILE_CODEC", + "REQUEST_PEER_LIST", + "SEND_FLOWFILES", + "RECEIVE_FLOWFILES", + "SHUTDOWN" +}; + +//! Respond Code +typedef enum { + RESERVED = 0, + // ResponseCode, so that we can indicate a 0 followed by some other bytes + + // handshaking properties + PROPERTIES_OK = 1, + UNKNOWN_PROPERTY_NAME = 230, + ILLEGAL_PROPERTY_VALUE = 231, + MISSING_PROPERTY = 232, + // transaction indicators + CONTINUE_TRANSACTION = 10, + FINISH_TRANSACTION = 11, + CONFIRM_TRANSACTION = 12, // "Explanation" of this code is the checksum + TRANSACTION_FINISHED = 13, + TRANSACTION_FINISHED_BUT_DESTINATION_FULL = 14, + CANCEL_TRANSACTION = 15, + BAD_CHECKSUM = 19, + // data availability indicators + MORE_DATA = 20, + NO_MORE_DATA = 21, + // port state indicators + UNKNOWN_PORT = 200, + PORT_NOT_IN_VALID_STATE = 201, + PORTS_DESTINATION_FULL = 202, + // authorization + UNAUTHORIZED = 240, + // error indicators + ABORT = 250, + UNRECOGNIZED_RESPONSE_CODE = 254, + END_OF_STREAM = 255 +} RespondCode; + +//! Respond Code Class +typedef struct { + RespondCode code; + const char *description; + bool hasDescription; +} RespondCodeContext; + +//! Respond Code Context +static RespondCodeContext respondCodeContext[] = +{ + {RESERVED, "Reserved for Future Use", false}, + {PROPERTIES_OK, "Properties OK", false}, + {UNKNOWN_PROPERTY_NAME, "Unknown Property Name", true}, + {ILLEGAL_PROPERTY_VALUE, "Illegal Property Value", true}, + {MISSING_PROPERTY, "Missing Property", true}, + {CONTINUE_TRANSACTION, "Continue Transaction", false}, + {FINISH_TRANSACTION, "Finish Transaction", false}, + {CONFIRM_TRANSACTION, "Confirm Transaction", true}, + {TRANSACTION_FINISHED, "Transaction Finished", false}, + {TRANSACTION_FINISHED_BUT_DESTINATION_FULL, "Transaction Finished But Destination is Full", false}, + {CANCEL_TRANSACTION, "Cancel Transaction", true}, + {BAD_CHECKSUM, "Bad Checksum", false}, + {MORE_DATA, "More Data Exists", false}, + {NO_MORE_DATA, "No More Data Exists", false}, + {UNKNOWN_PORT, "Unknown Port", false}, + {PORT_NOT_IN_VALID_STATE, "Port Not in a Valid State", true}, + {PORTS_DESTINATION_FULL, "Port's Destination is Full", false}, + {UNAUTHORIZED, "User Not Authorized", true}, + {ABORT, "Abort", true}, + {UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false}, + {END_OF_STREAM, "End of Stream", false} +}; + +//! Respond Code Sequence Pattern +static const uint8_t CODE_SEQUENCE_VALUE_1 = (uint8_t) 'R'; +static const uint8_t CODE_SEQUENCE_VALUE_2 = (uint8_t) 'C'; + +/** + * Enumeration of Properties that can be used for the Site-to-Site Socket + * Protocol. + */ +typedef enum { + /** + * Boolean value indicating whether or not the contents of a FlowFile should + * be GZipped when transferred. + */ + GZIP, + /** + * The unique identifier of the port to communicate with + */ + PORT_IDENTIFIER, + /** + * Indicates the number of milliseconds after the request was made that the + * client will wait for a response. If no response has been received by the + * time this value expires, the server can move on without attempting to + * service the request because the client will have already disconnected. + */ + REQUEST_EXPIRATION_MILLIS, + /** + * The preferred number of FlowFiles that the server should send to the + * client when pulling data. This property was introduced in version 5 of + * the protocol. + */ + BATCH_COUNT, + /** + * The preferred number of bytes that the server should send to the client + * when pulling data. This property was introduced in version 5 of the + * protocol. + */ + BATCH_SIZE, + /** + * The preferred amount of time that the server should send data to the + * client when pulling data. This property was introduced in version 5 of + * the protocol. Value is in milliseconds. + */ + BATCH_DURATION, + MAX_HANDSHAKE_PROPERTY +} HandshakeProperty; + +//! HandShakeProperty Str +static const char *HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] = +{ + /** + * Boolean value indicating whether or not the contents of a FlowFile should + * be GZipped when transferred. + */ + "GZIP", + /** + * The unique identifier of the port to communicate with + */ + "PORT_IDENTIFIER", + /** + * Indicates the number of milliseconds after the request was made that the + * client will wait for a response. If no response has been received by the + * time this value expires, the server can move on without attempting to + * service the request because the client will have already disconnected. + */ + "REQUEST_EXPIRATION_MILLIS", + /** + * The preferred number of FlowFiles that the server should send to the + * client when pulling data. This property was introduced in version 5 of + * the protocol. + */ + "BATCH_COUNT", + /** + * The preferred number of bytes that the server should send to the client + * when pulling data. This property was introduced in version 5 of the + * protocol. + */ + "BATCH_SIZE", + /** + * The preferred amount of time that the server should send data to the + * client when pulling data. This property was introduced in version 5 of + * the protocol. Value is in milliseconds. + */ + "BATCH_DURATION" +}; + +class Site2SiteClientProtocol; + +//! Transaction Class +class Transaction +{ + friend class Site2SiteClientProtocol; +public: + //! Constructor + /*! + * Create a new transaction + */ + Transaction(TransferDirection direction) { + _state = TRANSACTION_STARTED; + _direction = direction; + _dataAvailable = false; + _transfers = 0; + _bytes = 0; + + char uuidStr[37]; + + // Generate the global UUID for the transaction + uuid_generate(_uuid); + uuid_unparse(_uuid, uuidStr); + _uuidStr = uuidStr; + } + //! Destructor + virtual ~Transaction() + { + } + //! getUUIDStr + std::string getUUIDStr() + { + return _uuidStr; + } + //! getState + TransactionState getState() + { + return _state; + } + //! isDataAvailable + bool isDataAvailable() + { + return _dataAvailable; + } + //! setDataAvailable() + void setDataAvailable(bool value) + { + _dataAvailable = value; + } + //! getDirection + TransferDirection getDirection() + { + return _direction; + } + //! getCRC + long getCRC() + { + return _crc.getCRC(); + } + //! updateCRC + void updateCRC(uint8_t *buffer, uint32_t length) + { + _crc.update(buffer, length); + } + +protected: + +private: + //! Transaction State + TransactionState _state; + //! Transaction Direction + TransferDirection _direction; + //! Whether received data is available + bool _dataAvailable; + //! A global unique identifier + uuid_t _uuid; + //! UUID string + std::string _uuidStr; + //! Number of transfer + int _transfers; + //! Number of content bytes + uint64_t _bytes; + //! CRC32 + CRC32 _crc; + + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + Transaction(const Transaction &parent); + Transaction &operator=(const Transaction &parent); +}; + +/** + * Represents a piece of data that is to be sent to or that was received from a + * NiFi instance. + */ +class DataPacket +{ +public: + DataPacket(Site2SiteClientProtocol *protocol, Transaction *transaction, + std::map<std::string, std::string> attributes) { + _protocol = protocol; + _size = 0; + _transaction = transaction; + _attributes = attributes; + } + std::map<std::string, std::string> _attributes; + uint64_t _size; + Site2SiteClientProtocol *_protocol; + Transaction *_transaction; +}; + +//! Site2SiteClientProtocol Class +class Site2SiteClientProtocol +{ +public: + //! Constructor + /*! + * Create a new control protocol + */ + Site2SiteClientProtocol(Site2SitePeer *peer) { + _logger = Logger::getLogger(); + _configure = Configure::getConfigure(); + _peer = peer; + _batchSize = 0; + _batchCount = 0; + _batchDuration = 0; + _batchSendNanos = 5000000000; // 5 seconds + _timeOut = 30000; // 30 seconds + _peerState = IDLE; + _supportedVersion[0] = 5; + _supportedVersion[1] = 4; + _supportedVersion[2] = 3; + _supportedVersion[3] = 2; + _supportedVersion[4] = 1; + _currentVersion = _supportedVersion[0]; + _currentVersionIndex = 0; + _supportedCodecVersion[0] = 1; + _currentCodecVersion = _supportedCodecVersion[0]; + _currentCodecVersionIndex = 0; + } + //! Destructor + virtual ~Site2SiteClientProtocol() + { + } + +public: + //! setBatchSize + void setBatchSize(uint64_t size) + { + _batchSize = size; + } + //! setBatchCount + void setBatchCount(uint64_t count) + { + _batchCount = count; + } + //! setBatchDuration + void setBatchDuration(uint64_t duration) + { + _batchDuration = duration; + } + //! setTimeOut + void setTimeOut(uint64_t time) + { + _timeOut = time; + if (_peer) + _peer->setTimeOut(time); + + } + //! setPortId + void setPortId(uuid_t id) + { + uuid_copy(_portId, id); + char idStr[37]; + uuid_unparse(id, idStr); + _portIdStr = idStr; + } + //! getResourceName + std::string getResourceName() + { + return "SocketFlowFileProtocol"; + } + //! getCodecResourceName + std::string getCodecResourceName() + { + return "StandardFlowFileCodec"; + } + //! bootstrap the protocol to the ready for transaction state by going through the state machine + bool bootstrap(); + //! establish + bool establish(); + //! handShake + bool handShake(); + //! negotiateCodec + bool negotiateCodec(); + //! initiateResourceNegotiation + bool initiateResourceNegotiation(); + //! initiateCodecResourceNegotiation + bool initiateCodecResourceNegotiation(); + //! tearDown + void tearDown(); + //! write Request Type + int writeRequestType(RequestType type); + //! read Request Type + int readRequestType(RequestType &type); + //! read Respond + int readRespond(RespondCode &code, std::string &message); + //! write respond + int writeRespond(RespondCode code, std::string message); + //! getRespondCodeContext + RespondCodeContext *getRespondCodeContext(RespondCode code) + { + for (unsigned int i = 0; i < sizeof(respondCodeContext)/sizeof(RespondCodeContext); i++) + { + if (respondCodeContext[i].code == code) + { + return &respondCodeContext[i]; + } + } + return NULL; + } + //! getPeer + Site2SitePeer *getPeer() + { + return _peer; + } + //! Creation of a new transaction, return the transaction ID if success, + //! Return NULL when any error occurs + Transaction *createTransaction(std::string &transactionID, TransferDirection direction); + //! Receive the data packet from the transaction + //! Return false when any error occurs + bool receive(std::string transactionID, DataPacket *packet, bool &eof); + //! Send the data packet from the transaction + //! Return false when any error occurs + bool send(std::string transactionID, DataPacket *packet, FlowFileRecord *flowFile, ProcessSession *session); + //! Confirm the data that was sent or received by comparing CRC32's of the data sent and the data received. + bool confirm(std::string transactionID); + //! Cancel the transaction + void cancel(std::string transactionID); + //! Complete the transaction + bool complete(std::string transactionID); + //! Error the transaction + void error(std::string transactionID); + //! Receive flow files for the process session + void receiveFlowFiles(ProcessContext *context, ProcessSession *session); + //! Transfer flow files for the process session + void transferFlowFiles(ProcessContext *context, ProcessSession *session); + //! deleteTransaction + void deleteTransaction(std::string transactionID); + //! Nest Callback Class for write stream + class WriteCallback : public OutputStreamCallback + { + public: + WriteCallback(DataPacket *packet) + : _packet(packet) {} + DataPacket *_packet; + void process(std::ofstream *stream) { + uint8_t buffer[8192]; + int len = _packet->_size; + while (len > 0) + { + int size = std::min(len, (int) sizeof(buffer)); + int ret = _packet->_protocol->_peer->readData(buffer, size, &_packet->_transaction->_crc); + if (ret != size) + { + _packet->_protocol->_logger->log_error("Site2Site Receive Flow Size %d Failed %d", size, ret); + break; + } + stream->write((const char *) buffer, size); + len -= size; + } + } + }; + //! Nest Callback Class for read stream + class ReadCallback : public InputStreamCallback + { + public: + ReadCallback(DataPacket *packet) + : _packet(packet) {} + DataPacket *_packet; + void process(std::ifstream *stream) { + _packet->_size = 0; + uint8_t buffer[8192]; + int readSize; + while (stream->good()) + { + if (!stream->read((char *)buffer, 8192)) + readSize = stream->gcount(); + else + readSize = 8192; + int ret = _packet->_protocol->_peer->write(buffer, readSize, &_packet->_transaction->_crc); + if (ret != readSize) + { + _packet->_protocol->_logger->log_error("Site2Site Send Flow Size %d Failed %d", readSize, ret); + break; + } + _packet->_size += readSize; + } + } + }; + +protected: + +private: + //! Mutex for protection + std::mutex _mtx; + //! Logger + Logger *_logger; + //! Configure + Configure *_configure; + //! Batch Count + std::atomic<uint64_t> _batchCount; + //! Batch Size + std::atomic<uint64_t> _batchSize; + //! Batch Duration in msec + std::atomic<uint64_t> _batchDuration; + //! Timeout in msec + std::atomic<uint64_t> _timeOut; + //! Peer Connection + Site2SitePeer *_peer; + //! portId + uuid_t _portId; + //! portIDStr + std::string _portIdStr; + //! BATCH_SEND_NANOS + uint64_t _batchSendNanos; + //! Peer State + PeerState _peerState; + uint32_t _supportedVersion[5]; + uint32_t _currentVersion; + int _currentVersionIndex; + uint32_t _supportedCodecVersion[1]; + uint32_t _currentCodecVersion; + int _currentCodecVersionIndex; + //! commsIdentifier + std::string _commsIdentifier; + //! transaction map + std::map<std::string, Transaction *> _transactionMap; + + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + Site2SiteClientProtocol(const Site2SiteClientProtocol &parent); + Site2SiteClientProtocol &operator=(const Site2SiteClientProtocol &parent); +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/inc/Site2SitePeer.h ---------------------------------------------------------------------- diff --git a/inc/Site2SitePeer.h b/inc/Site2SitePeer.h new file mode 100644 index 0000000..e6972ad --- /dev/null +++ b/inc/Site2SitePeer.h @@ -0,0 +1,359 @@ +/** + * @file Site2SitePeer.h + * Site2SitePeer class declaration for site to site peer + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __SITE2SITE_PEER_H__ +#define __SITE2SITE_PEER_H__ + +#include <stdio.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <fcntl.h> +#include <netdb.h> +#include <string> +#include <errno.h> +#include <mutex> +#include <atomic> +#include "TimeUtil.h" +#include "Logger.h" +#include "Configure.h" +#include "Property.h" + +class CRC32 +{ +public: + CRC32() { + crc = 0; + + if (tableInit) + return; + + tableInit = true; + unsigned int poly = 0xedb88320; + unsigned int temp = 0; + for(unsigned int i = 0; i < 256; ++i) { + temp = i; + for(int j = 8; j > 0; --j) { + if((temp & 1) == 1) { + temp = (unsigned int)((temp >> 1) ^ poly); + }else { + temp >>= 1; + } + } + table[i] = temp; + } + } + + unsigned int update(uint8_t * bytes, size_t size) { + crc = crc ^ ~0U; + for(unsigned int i = 0; i < size; ++i) { + uint8_t index = (uint8_t)(((crc) & 0xff) ^ bytes[i]); + crc = (unsigned int)((crc >> 8) ^ table[index]); + } + crc = crc ^ ~0U; + return crc; + } + + long getCRC() + { + return crc; + } + +private: + static unsigned int table[256]; + static std::atomic<bool> tableInit; + unsigned int crc; +}; + +static const char MAGIC_BYTES[] = {'N', 'i', 'F', 'i'}; + +//! Site2SitePeer Class +class Site2SitePeer +{ +public: + //! Constructor + /*! + * Create a new site2site peer + */ + Site2SitePeer(std::string host, uint16_t port) { + _logger = Logger::getLogger(); + _configure = Configure::getConfigure(); + _socket = 0; + _host = host; + _port = port; + _yieldExpiration = 0; + _timeOut = 30000; // 30 seconds + _url = "nifi://" + _host + ":" + std::to_string(_port); + } + //! Destructor + virtual ~Site2SitePeer() { Close();} + //! Set Processor yield period in MilliSecond + void setYieldPeriodMsec(uint64_t period) { + _yieldPeriodMsec = period; + } + //! get URL + std::string getURL() { + return _url; + } + //! Get Processor yield period in MilliSecond + uint64_t getYieldPeriodMsec(void) { + return(_yieldPeriodMsec); + } + //! Yield based on the yield period + void yield() + { + _yieldExpiration = (getTimeMillis() + _yieldPeriodMsec); + } + //! setHostName + void setHostName(std::string host) + { + _host = host; + _url = "nifi://" + _host + ":" + std::to_string(_port); + } + //! setPort + void setPort(uint16_t port) + { + _port = port; + _url = "nifi://" + _host + ":" + std::to_string(_port); + } + //! getHostName + std::string getHostName() + { + return _host; + } + //! getPort + uint16_t getPort() + { + return _port; + } + //! Yield based on the input time + void yield(uint64_t time) + { + _yieldExpiration = (getTimeMillis() + time); + } + //! whether need be to yield + bool isYield() + { + if (_yieldExpiration > 0) + return (_yieldExpiration >= getTimeMillis()); + else + return false; + } + // clear yield expiration + void clearYield() + { + _yieldExpiration = 0; + } + //! Yield based on the yield period + void yield(std::string portId) + { + std::lock_guard<std::mutex> lock(_mtx); + uint64_t yieldExpiration = (getTimeMillis() + _yieldPeriodMsec); + _yieldExpirationPortIdMap[portId] = yieldExpiration; + } + //! Yield based on the input time + void yield(std::string portId, uint64_t time) + { + std::lock_guard<std::mutex> lock(_mtx); + uint64_t yieldExpiration = (getTimeMillis() + time); + _yieldExpirationPortIdMap[portId] = yieldExpiration; + } + //! whether need be to yield + bool isYield(std::string portId) + { + std::lock_guard<std::mutex> lock(_mtx); + std::map<std::string, uint64_t>::iterator it = this->_yieldExpirationPortIdMap.find(portId); + if (it != _yieldExpirationPortIdMap.end()) + { + uint64_t yieldExpiration = it->second; + return (yieldExpiration >= getTimeMillis()); + } + else + { + return false; + } + } + //! clear yield expiration + void clearYield(std::string portId) + { + std::lock_guard<std::mutex> lock(_mtx); + std::map<std::string, uint64_t>::iterator it = this->_yieldExpirationPortIdMap.find(portId); + if (it != _yieldExpirationPortIdMap.end()) + { + _yieldExpirationPortIdMap.erase(portId); + } + } + //! setTimeOut + void setTimeOut(uint64_t time) + { + _timeOut = time; + } + int write(uint8_t value, CRC32 *crc = NULL) + { + return sendData(&value, 1, crc); + } + int write(char value, CRC32 *crc = NULL) + { + return sendData((uint8_t *)&value, 1, crc); + } + int write(uint32_t value, CRC32 *crc = NULL) + { + uint8_t temp[4]; + + temp[0] = (value & 0xFF000000) >> 24; + temp[1] = (value & 0x00FF0000) >> 16; + temp[2] = (value & 0x0000FF00) >> 8; + temp[3] = (value & 0x000000FF); + return sendData(temp, 4, crc); + } + int write(uint16_t value, CRC32 *crc = NULL) + { + uint8_t temp[2]; + temp[0] = (value & 0xFF00) >> 8; + temp[1] = (value & 0xFF); + return sendData(temp, 2, crc); + } + int write(uint8_t *value, int len, CRC32 *crc = NULL) + { + return sendData(value, len, crc); + } + int write(uint64_t value, CRC32 *crc = NULL) + { + uint8_t temp[8]; + + temp[0] = (value >> 56) & 0xFF; + temp[1] = (value >> 48) & 0xFF; + temp[2] = (value >> 40) & 0xFF; + temp[3] = (value >> 32) & 0xFF; + temp[4] = (value >> 24) & 0xFF; + temp[5] = (value >> 16) & 0xFF; + temp[6] = (value >> 8) & 0xFF; + temp[7] = (value >> 0) & 0xFF; + return sendData(temp, 8, crc); + } + int write(bool value, CRC32 *crc = NULL) + { + uint8_t temp = value; + return write(temp, crc); + } + int writeUTF(std::string str, bool widen = false, CRC32 *crc = NULL); + int read(uint8_t &value, CRC32 *crc = NULL) + { + uint8_t buf; + + int ret = readData(&buf, 1, crc); + if (ret == 1) + value = buf; + return ret; + } + int read(uint16_t &value, CRC32 *crc = NULL) + { + uint8_t buf[2]; + + int ret = readData(buf, 2, crc); + if (ret == 2) + value = (buf[0] << 8) | buf[1]; + return ret; + } + int read(char &value, CRC32 *crc = NULL) + { + uint8_t buf; + + int ret = readData(&buf, 1, crc); + if (ret == 1) + value = (char) buf; + return ret; + } + int read(uint8_t *value, int len, CRC32 *crc = NULL) + { + return readData(value, len, crc); + } + int read(uint32_t &value, CRC32 *crc = NULL) + { + uint8_t buf[4]; + + int ret = readData(buf, 4, crc); + if (ret == 4) + value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; + return ret; + } + int read(uint64_t &value, CRC32 *crc = NULL) + { + uint8_t buf[8]; + + int ret = readData(buf, 8, crc); + if (ret == 8) + { + value = ((uint64_t) buf[0] << 56) | + ((uint64_t) (buf[1] & 255) << 48) | + ((uint64_t) (buf[2] & 255) << 40) | + ((uint64_t) (buf[3] & 255) << 32) | + ((uint64_t) (buf[4] & 255) << 24) | + ((uint64_t) (buf[5] & 255) << 16) | + ((uint64_t) (buf[6] & 255) << 8) | + ((uint64_t) (buf[7] & 255) << 0); + } + return ret; + } + int readUTF(std::string &str, bool widen = false, CRC32 *crc = NULL); + //! open connection to the peer + bool Open(); + //! close connection to the peer + void Close(); + //! Send Data via the socket, return -1 for failure + int sendData(uint8_t *buf, int buflen, CRC32 *crc = NULL); + //! Read length into buf, return -1 for failure and 0 for EOF + int readData(uint8_t *buf, int buflen, CRC32 *crc = NULL); + //! Select on the socket + int Select(int msec); + +protected: + +private: + //! Mutex for protection + std::mutex _mtx; + //! S2S server Name + std::string _host; + //! S2S server port + uint16_t _port; + //! socket to server + int _socket; + //! URL + std::string _url; + //! socket timeout; + std::atomic<uint64_t> _timeOut; + //! Logger + Logger *_logger; + //! Configure + Configure *_configure; + //! Yield Period in Milliseconds + std::atomic<uint64_t> _yieldPeriodMsec; + //! Yield Expiration + std::atomic<uint64_t> _yieldExpiration; + //! Yield Expiration per destination PortID + std::map<std::string, uint64_t> _yieldExpirationPortIdMap; + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + Site2SitePeer(const Site2SitePeer &parent); + Site2SitePeer &operator=(const Site2SitePeer &parent); +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/inc/TimeUtil.h ---------------------------------------------------------------------- diff --git a/inc/TimeUtil.h b/inc/TimeUtil.h index 0c9bac4..b024245 100644 --- a/inc/TimeUtil.h +++ b/inc/TimeUtil.h @@ -73,7 +73,7 @@ inline std::string getTimeStr(uint64_t msec) std::string ret = date; date[0] = '\0'; - sprintf(date, ".%03llu", msec); + sprintf(date, ".%03llu", (unsigned long long) msec); ret += date; return ret; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/main/MiNiFiMain.cpp ---------------------------------------------------------------------- diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp index 4d9db5d..1ceaba5 100644 --- a/main/MiNiFiMain.cpp +++ b/main/MiNiFiMain.cpp @@ -52,7 +52,7 @@ void sigHandler(int signal) int main(int argc, char **argv) { Logger *logger = Logger::getLogger(); - logger->setLogLevel(info); + logger->setLogLevel(trace); logger->log_info("MiNiFi started"); if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/FlowControlProtocol.cpp ---------------------------------------------------------------------- diff --git a/src/FlowControlProtocol.cpp b/src/FlowControlProtocol.cpp index 6aaa969..6f1517c 100644 --- a/src/FlowControlProtocol.cpp +++ b/src/FlowControlProtocol.cpp @@ -215,6 +215,8 @@ int FlowControlProtocol::readHdr(FlowControlProtocolHeader *hdr) void FlowControlProtocol::start() { + if (_reportInterval <= 0) + return; if (_running) return; _running = true; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/src/FlowController.cpp b/src/FlowController.cpp index a2cafbc..b18953e 100644 --- a/src/FlowController.cpp +++ b/src/FlowController.cpp @@ -136,6 +136,10 @@ Processor *FlowController::createProcessor(std::string name, uuid_t uuid) { processor = new LogAttribute(name, uuid); } + else if (name == RealTimeDataCollector::ProcessorName) + { + processor = new RealTimeDataCollector(name, uuid); + } else { _logger->log_error("No Processor defined for %s", name.c_str()); @@ -269,10 +273,20 @@ void FlowController::parseConnection(xmlDoc *doc, xmlNode *node, ProcessGroup *p if (temp) { std::string relationshipName = temp; - Relationship relationship(relationshipName, ""); - _logger->log_debug("parseConnection: relationship => [%s]", relationshipName.c_str()); - if (connection) - connection->setRelationship(relationship); + if (!relationshipName.empty()) + { + Relationship relationship(relationshipName, ""); + _logger->log_debug("parseConnection: relationship => [%s]", relationshipName.c_str()); + if (connection) + connection->setRelationship(relationship); + } + else + { + Relationship empty; + _logger->log_debug("parseConnection: relationship => [%s]", empty.getName().c_str()); + if (connection) + connection->setRelationship(empty); + } xmlFree(temp); } } @@ -334,9 +348,110 @@ void FlowController::parseRootProcessGroup(xmlDoc *doc, xmlNode *node) { this->parseConnection(doc, currentNode, group); } + else if (xmlStrcmp(currentNode->name, BAD_CAST "remoteProcessGroup") == 0) + { + this->parseRemoteProcessGroup(doc, currentNode, group); + } } // if (currentNode->type == XML_ELEMENT_NODE) } // for node +} +void FlowController::parseRemoteProcessGroup(xmlDoc *doc, xmlNode *node, ProcessGroup *parent) +{ + uuid_t uuid; + xmlNode *currentNode; + ProcessGroup *group = NULL; + int64_t yieldPeriod = -1; + int64_t timeOut = -1; + + // generate the random UIID + uuid_generate(uuid); + + for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) + { + if (currentNode->type == XML_ELEMENT_NODE) + { + if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) + { + char *id = (char *) xmlNodeGetContent(currentNode); + if (id) + { + _logger->log_debug("parseRootProcessGroup: id => [%s]", id); + uuid_parse(id, uuid); + xmlFree(id); + } + } + else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) + { + char *name = (char *) xmlNodeGetContent(currentNode); + if (name) + { + _logger->log_debug("parseRemoteProcessGroup: name => [%s]", name); + group = this->createRemoteProcessGroup(name, uuid); + if (group == NULL) + { + xmlFree(name); + return; + } + group->setParent(parent); + parent->addProcessGroup(group); + xmlFree(name); + } + } + else if (xmlStrcmp(currentNode->name, BAD_CAST "yieldPeriod") == 0) + { + TimeUnit unit; + char *temp = (char *) xmlNodeGetContent(currentNode); + if (temp) + { + if (Property::StringToTime(temp, yieldPeriod, unit) && + Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod) && group) + { + _logger->log_debug("parseRemoteProcessGroup: yieldPeriod => [%d] ms", yieldPeriod); + group->setYieldPeriodMsec(yieldPeriod); + } + xmlFree(temp); + } + } + else if (xmlStrcmp(currentNode->name, BAD_CAST "timeout") == 0) + { + TimeUnit unit; + char *temp = (char *) xmlNodeGetContent(currentNode); + if (temp) + { + if (Property::StringToTime(temp, timeOut, unit) && + Property::ConvertTimeUnitToMS(timeOut, unit, timeOut) && group) + { + _logger->log_debug("parseRemoteProcessGroup: timeOut => [%d] ms", timeOut); + group->setTimeOut(timeOut); + } + xmlFree(temp); + } + } + else if (xmlStrcmp(currentNode->name, BAD_CAST "transmitting") == 0) + { + char *temp = (char *) xmlNodeGetContent(currentNode); + bool transmitting; + if (temp) + { + if (Property::StringToBool(temp, transmitting) && group) + { + _logger->log_debug("parseRemoteProcessGroup: transmitting => [%d]", transmitting); + group->setTransmitting(transmitting); + } + xmlFree(temp); + } + } + else if (xmlStrcmp(currentNode->name, BAD_CAST "inputPort") == 0 && group) + { + this->parsePort(doc, currentNode, group, SEND); + } + else if (xmlStrcmp(currentNode->name, BAD_CAST "outputPort") == 0 && group) + { + this->parsePort(doc, currentNode, group, RECEIVE); + } + } // if (currentNode->type == XML_ELEMENT_NODE) + } // for node } void FlowController::parseProcessorProperty(xmlDoc *doc, xmlNode *node, Processor *processor) @@ -383,6 +498,106 @@ void FlowController::parseProcessorProperty(xmlDoc *doc, xmlNode *node, Processo } // for node } +void FlowController::parsePort(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent, TransferDirection direction) +{ + char *id = NULL; + char *name = NULL; + uuid_t uuid; + xmlNode *currentNode; + Processor *processor = NULL; + RemoteProcessorGroupPort *port = NULL; + + if (!parent) + { + _logger->log_error("parseProcessNode: no parent group existed"); + return; + } + // generate the random UIID + uuid_generate(uuid); + + for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) + { + if (currentNode->type == XML_ELEMENT_NODE) + { + if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) + { + id = (char *) xmlNodeGetContent(currentNode); + if (id) + { + _logger->log_debug("parseProcessorNode: id => [%s]", id); + uuid_parse(id, uuid); + xmlFree(id); + } + } + else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) + { + name = (char *) xmlNodeGetContent(currentNode); + if (name) + { + _logger->log_debug("parseProcessorNode: name => [%s]", name); + port = new RemoteProcessorGroupPort(name, uuid); + processor = (Processor *) port; + if (processor == NULL) + { + xmlFree(name); + return; + } + port->setDirection(direction); + port->setTimeOut(parent->getTimeOut()); + port->setTransmitting(parent->getTransmitting()); + processor->setYieldPeriodMsec(parent->getYieldPeriodMsec()); + processor->initialize(); + // add processor to parent + parent->addProcessor(processor); + xmlFree(name); + } + } + else if (xmlStrcmp(currentNode->name, BAD_CAST "scheduledState") == 0) + { + char *temp = (char *) xmlNodeGetContent(currentNode); + if (temp) + { + std::string state = temp; + if (state == "DISABLED") + { + _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); + processor->setScheduledState(DISABLED); + } + if (state == "STOPPED") + { + _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); + processor->setScheduledState(STOPPED); + } + if (state == "RUNNING") + { + _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); + processor->setScheduledState(RUNNING); + } + xmlFree(temp); + } + } + else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0) + { + char *temp = (char *) xmlNodeGetContent(currentNode); + if (temp) + { + int64_t maxConcurrentTasks; + if (Property::StringToInt(temp, maxConcurrentTasks)) + { + _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); + processor->setMaxConcurrentTasks(maxConcurrentTasks); + } + xmlFree(temp); + } + } + else if (xmlStrcmp(currentNode->name, BAD_CAST "property") == 0) + { + this->parseProcessorProperty(doc, currentNode, processor); + } + } // if (currentNode->type == XML_ELEMENT_NODE) + } // while node +} + void FlowController::parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent) { char *id = NULL; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/GenerateFlowFile.cpp ---------------------------------------------------------------------- diff --git a/src/GenerateFlowFile.cpp b/src/GenerateFlowFile.cpp index 16181de..4b0603d 100644 --- a/src/GenerateFlowFile.cpp +++ b/src/GenerateFlowFile.cpp @@ -115,6 +115,7 @@ void GenerateFlowFile::onTrigger(ProcessContext *context, ProcessSession *sessio { int randValue = random(); *((int *) current) = randValue; + // *((int *) current) = (0xFFFFFFFF & i); current += sizeof(int); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/LogAttribute.cpp ---------------------------------------------------------------------- diff --git a/src/LogAttribute.cpp b/src/LogAttribute.cpp index 67ed74e..82130f8 100644 --- a/src/LogAttribute.cpp +++ b/src/LogAttribute.cpp @@ -109,7 +109,7 @@ void LogAttribute::onTrigger(ProcessContext *context, ProcessSession *session) message << "\n" << "Payload:" << "\n"; ReadCallback callback(flow->getSize()); session->read(flow, &callback); - for (int i = 0, j = 0; i < callback._readSize; i++) + for (unsigned int i = 0, j = 0; i < callback._readSize; i++) { char temp[8]; sprintf(temp, "%02x ", (unsigned char) (callback._buffer[i])); @@ -147,9 +147,10 @@ void LogAttribute::onTrigger(ProcessContext *context, ProcessSession *session) } // Test Import + /* FlowFileRecord *importRecord = session->create(); session->import(claim->getContentFullPath(), importRecord); - session->transfer(importRecord, Success); + session->transfer(importRecord, Success); */ // Transfer to the relationship http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/ProcessGroup.cpp ---------------------------------------------------------------------- diff --git a/src/ProcessGroup.cpp b/src/ProcessGroup.cpp index a5fd773..d44cc99 100644 --- a/src/ProcessGroup.cpp +++ b/src/ProcessGroup.cpp @@ -40,6 +40,9 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, else uuid_copy(_uuid, uuid); + _yieldPeriodMsec = 0; + _transmitting = false; + _logger = Logger::getLogger(); _logger->log_info("ProcessGroup %s created", _name.c_str()); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/ProcessSession.cpp ---------------------------------------------------------------------- diff --git a/src/ProcessSession.cpp b/src/ProcessSession.cpp index 32d8920..2628ae3 100644 --- a/src/ProcessSession.cpp +++ b/src/ProcessSession.cpp @@ -118,8 +118,6 @@ FlowFileRecord* ProcessSession::cloneDuringTransfer(FlowFileRecord *parent) record->_size = parent->_size; record->_claim->increaseFlowFileRecordOwnedCount(); } - // Copy connection - record->_connection = parent->_connection; } return record; @@ -132,7 +130,7 @@ FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent, long offset, long { if (parent->_claim) { - if ((offset + size) > parent->_size) + if ((offset + size) > (long) parent->_size) { // Set offset and size _logger->log_error("clone offset %d and size %d exceed parent size %d", @@ -486,7 +484,12 @@ void ProcessSession::commit() else { // Clone the flow file and route to the connection - this->cloneDuringTransfer(record); + FlowFileRecord *cloneRecord; + cloneRecord = this->cloneDuringTransfer(record); + if (cloneRecord) + cloneRecord->_connection = connection; + else + throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer"); } } } @@ -540,7 +543,12 @@ void ProcessSession::commit() else { // Clone the flow file and route to the connection - this->cloneDuringTransfer(record); + FlowFileRecord *cloneRecord; + cloneRecord = this->cloneDuringTransfer(record); + if (cloneRecord) + cloneRecord->_connection = connection; + else + throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer"); } } } @@ -627,7 +635,10 @@ void ProcessSession::rollback() { FlowFileRecord *record = it->second; if (record->_orginalConnection) + { + record->_snapshot = false; record->_orginalConnection->put(record); + } else delete record; } @@ -685,6 +696,7 @@ FlowFileRecord *ProcessSession::get() if (ret) { // add the flow record to the current process session update map + ret->_markedDelete = false; _updatedFlowFiles[ret->getUUIDStr()] = ret; std::map<std::string, std::string> empty; FlowFileRecord *snapshot = new FlowFileRecord(empty);
