Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 6b317fb4f -> c45f05e51


MINIFI-181 Created initial implementation of PutFile

This closes #39.

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/c45f05e5
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/c45f05e5
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/c45f05e5

Branch: refs/heads/master
Commit: c45f05e51e78a5dfe9c69ce231ec579e7736ed2d
Parents: 6b317fb
Author: Andrew I. Christianson <[email protected]>
Authored: Thu Jan 5 19:15:54 2017 +0000
Committer: Aldrin Piri <[email protected]>
Committed: Fri Jan 13 16:22:49 2017 -0500

----------------------------------------------------------------------
 libminifi/include/FlowController.h |   1 +
 libminifi/include/PutFile.h        |  88 ++++++++++++++
 libminifi/src/FlowController.cpp   |   4 +
 libminifi/src/PutFile.cpp          | 200 ++++++++++++++++++++++++++++++++
 4 files changed, 293 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c45f05e5/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h 
b/libminifi/include/FlowController.h
index 49629a2..ee8bb4f 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -49,6 +49,7 @@
 #include "RemoteProcessorGroupPort.h"
 #include "Provenance.h"
 #include "GetFile.h"
+#include "PutFile.h"
 #include "TailFile.h"
 #include "ListenSyslog.h"
 #include "ExecuteProcess.h"

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c45f05e5/libminifi/include/PutFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/PutFile.h b/libminifi/include/PutFile.h
new file mode 100644
index 0000000..9f1375d
--- /dev/null
+++ b/libminifi/include/PutFile.h
@@ -0,0 +1,88 @@
+/**
+ * @file PutFile.h
+ * PutFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PUT_FILE_H__
+#define __PUT_FILE_H__
+
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+
+//! PutFile Class
+class PutFile : public Processor
+{
+public:
+
+       static const std::string CONFLICT_RESOLUTION_STRATEGY_REPLACE;
+       static const std::string CONFLICT_RESOLUTION_STRATEGY_IGNORE;
+       static const std::string CONFLICT_RESOLUTION_STRATEGY_FAIL;
+
+       //! Constructor
+       /*!
+        * Create a new processor
+        */
+       PutFile(std::string name, uuid_t uuid = NULL)
+       : Processor(name, uuid)
+       {
+               _logger = Logger::getLogger();
+       }
+       //! Destructor
+       virtual ~PutFile()
+       {
+       }
+       //! Processor Name
+       static const std::string ProcessorName;
+       //! Supported Properties
+       static Property Directory;
+       static Property ConflictResolution;
+       //! Supported Relationships
+       static Relationship Success;
+       static Relationship Failure;
+
+       //! OnTrigger method, implemented by NiFi PutFile
+       virtual void onTrigger(ProcessContext *context, ProcessSession 
*session);
+       //! Initialize, over write by NiFi PutFile
+       virtual void initialize(void);
+
+       class ReadCallback : public InputStreamCallback
+       {
+       public:
+               ReadCallback(const std::string &tmpFile, const std::string 
&destFile);
+               ~ReadCallback();
+               virtual void process(std::ifstream *stream);
+               bool commit();
+
+       private:
+               Logger *_logger;
+               std::ofstream _tmpFileOs;
+               bool _writeSucceeded = false;
+               std::string _tmpFile;
+               std::string _destFile;
+       };
+
+protected:
+
+private:
+       //! Logger
+       Logger *_logger;
+
+       bool putFile(ProcessSession *session, FlowFileRecord *flowFile, const 
std::string &tmpFile, const std::string &destFile);
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c45f05e5/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 455788c..caaa8ea 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -183,6 +183,10 @@ Processor *FlowController::createProcessor(std::string 
name, uuid_t uuid)
        {
         processor = new GetFile(name, uuid);
        }
+       else if (name == PutFile::ProcessorName)
+       {
+        processor = new PutFile(name, uuid);
+       }
        else if (name == TailFile::ProcessorName)
        {
         processor = new TailFile(name, uuid);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c45f05e5/libminifi/src/PutFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/PutFile.cpp b/libminifi/src/PutFile.cpp
new file mode 100644
index 0000000..3f209ce
--- /dev/null
+++ b/libminifi/src/PutFile.cpp
@@ -0,0 +1,200 @@
+/**
+ * @file PutFile.cpp
+ * PutFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <fstream>
+#include <uuid/uuid.h>
+
+#include "TimeUtil.h"
+#include "PutFile.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE("replace");
+const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_IGNORE("ignore");
+const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL("fail");
+
+const std::string PutFile::ProcessorName("PutFile");
+
+Property PutFile::Directory("Output Directory", "The output directory to which 
to put files", ".");
+Property PutFile::ConflictResolution("Conflict Resolution Strategy", 
"Indicates what should happen when a file with the same name already exists in 
the output directory", CONFLICT_RESOLUTION_STRATEGY_FAIL);
+
+Relationship PutFile::Success("success", "All files are routed to success");
+Relationship PutFile::Failure("failure", "Failed files (conflict, write 
failure, etc.) are transferred to failure");
+
+void PutFile::initialize()
+{
+       //! Set the supported properties
+       std::set<Property> properties;
+       properties.insert(Directory);
+       properties.insert(ConflictResolution);
+       setSupportedProperties(properties);
+       //! Set the supported relationships
+       std::set<Relationship> relationships;
+       relationships.insert(Success);
+       relationships.insert(Failure);
+       setSupportedRelationships(relationships);
+}
+
+void PutFile::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+       std::string directory;
+
+       if (!context->getProperty(Directory.getName(), directory))
+       {
+               _logger->log_error("Directory attribute is missing or invalid");
+               return;
+       }
+
+       std::string conflictResolution;
+
+       if (!context->getProperty(ConflictResolution.getName(), 
conflictResolution))
+       {
+               _logger->log_error("Conflict Resolution Strategy attribute is 
missing or invalid");
+               return;
+       }
+
+       FlowFileRecord *flowFile = session->get();
+
+       // Do nothing if there are no incoming files
+       if (!flowFile)
+       {
+               return;
+       }
+
+       std::string filename;
+       flowFile->getAttribute(FILENAME, filename);
+
+       // Generate a safe (universally-unique) temporary filename on the same 
partition 
+       char tmpFileUuidStr[37];
+       uuid_t tmpFileUuid;
+       uuid_generate(tmpFileUuid);
+       uuid_unparse(tmpFileUuid, tmpFileUuidStr);
+       std::stringstream tmpFileSs;
+       tmpFileSs << directory << "/." << filename << "." << tmpFileUuidStr;
+       std::string tmpFile = tmpFileSs.str();
+       _logger->log_info("PutFile using temporary file %s", tmpFile.c_str());
+
+       // Determine dest full file paths
+       std::stringstream destFileSs;
+       destFileSs << directory << "/" << filename;
+       std::string destFile = destFileSs.str();
+
+       _logger->log_info("PutFile writing file %s into directory %s", 
filename.c_str(), directory.c_str());
+
+       // If file exists, apply conflict resolution strategy
+       struct stat statResult;
+
+       if (stat(destFile.c_str(), &statResult) == 0)
+       {
+               _logger->log_info("Destination file %s exists; applying 
Conflict Resolution Strategy: %s", destFile.c_str(), 
conflictResolution.c_str());
+
+               if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_REPLACE)
+               {
+                       putFile(session, flowFile, tmpFile, destFile);
+               }
+               else if (conflictResolution == 
CONFLICT_RESOLUTION_STRATEGY_IGNORE)
+               {
+                       session->transfer(flowFile, Success);
+               }
+               else
+               {
+                       session->transfer(flowFile, Failure);
+               }
+       }
+       else
+       {
+               putFile(session, flowFile, tmpFile, destFile);
+       }
+}
+
+bool PutFile::putFile(ProcessSession *session, FlowFileRecord *flowFile, const 
std::string &tmpFile, const std::string &destFile)
+{
+
+       ReadCallback cb(tmpFile, destFile);
+       session->read(flowFile, &cb);
+
+       if (cb.commit())
+       {
+               session->transfer(flowFile, Success);
+       }
+       else
+       {
+               session->transfer(flowFile, Failure);
+       }
+}
+
+PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, const 
std::string &destFile)
+: _tmpFile(tmpFile)
+, _tmpFileOs(tmpFile)
+, _destFile(destFile)
+{
+       _logger = Logger::getLogger();
+}
+
+// Copy the entire file contents to the temporary file
+void PutFile::ReadCallback::process(std::ifstream *stream)
+{
+       // Copy file contents into tmp file
+       _writeSucceeded = false;
+       _tmpFileOs << stream->rdbuf();
+       _writeSucceeded = true;
+}
+
+// Renames tmp file to final destination
+// Returns true if commit succeeded
+bool PutFile::ReadCallback::commit()
+{
+       bool success = false;
+
+       _logger->log_info("PutFile committing put file operation to %s", 
_destFile.c_str());
+
+       if (_writeSucceeded)
+       {
+               _tmpFileOs.close();
+
+               if (rename(_tmpFile.c_str(), _destFile.c_str()))
+               {
+                       _logger->log_info("PutFile commit put file operation to 
%s failed because rename() call failed", _destFile.c_str());
+               }
+               else
+               {
+                       success = true;
+                       _logger->log_info("PutFile commit put file operation to 
%s succeeded", _destFile.c_str());
+               }
+       }
+       else
+       {
+               _logger->log_error("PutFile commit put file operation to %s 
failed because write failed", _destFile.c_str());
+       }
+
+       return success;
+}
+
+// Clean up resources
+PutFile::ReadCallback::~ReadCallback() {
+       // Close tmp file
+       _tmpFileOs.close();
+
+       // Clean up tmp file, if necessary
+       unlink(_tmpFile.c_str());
+}

Reply via email to