[
https://issues.apache.org/jira/browse/MINIFICPP-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210997#comment-16210997
]
ASF GitHub Bot commented on MINIFICPP-39:
-----------------------------------------
Github user phrocker commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/148#discussion_r145691328
--- Diff: libminifi/src/core/ProcessSession.cpp ---
@@ -799,6 +799,152 @@ void ProcessSession::import(std::string source,
std::shared_ptr<core::FlowFile>
}
}
+bool ProcessSession::exportContent(
+ const std::string &destination,
+ const std::string &tmpFile,
+ std::shared_ptr<core::FlowFile> &flow,
+ bool keepContent) {
+ logger_->log_info(
+ "Exporting content of %s to %s",
+ flow->getUUIDStr().c_str(),
+ destination.c_str());
+
+ ReadCallback cb(tmpFile, destination, logger_);
+ read(flow, &cb);
+
+ logger_->log_info("Committing %s", destination.c_str());
+ bool commit_ok = cb.commit();
+
+ if (commit_ok) {
+ logger_->log_info("Commit OK.");
+ } else {
+ logger_->log_error(
+ "Commit of %s to %s failed!",
+ flow->getUUIDStr().c_str(),
+ destination.c_str());
+ }
+ return commit_ok;
+}
+
+bool ProcessSession::exportContent(
+ const std::string &destination,
+ std::shared_ptr<core::FlowFile> &flow,
+ bool keepContent) {
+ std::string tmpFileName = boost::filesystem::unique_path().native();
+ return exportContent(destination, tmpFileName, flow, keepContent);
+}
+
+ProcessSession::ReadCallback::ReadCallback(const std::string &tmpFile,
+ const std::string &destFile,
+
std::shared_ptr<logging::Logger> logger)
+ : _tmpFile(tmpFile),
+ _tmpFileOs(tmpFile, std::ios::binary),
+ _destFile(destFile),
+ logger_(logger) {
+}
+
+// Copy the entire file contents to the temporary file
+int64_t
ProcessSession::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+ // Copy file contents into tmp file
+ _writeSucceeded = false;
+ size_t size = 0;
+ uint8_t buffer[8192];
+ do {
+ int read = stream->read(buffer, 8192);
+ if (read < 0) {
+ return -1;
+ }
+ if (read == 0) {
+ break;
+ }
+ _tmpFileOs.write(reinterpret_cast<char*>(buffer), read);
+ size += read;
+ } while (size < stream->getSize());
+ _writeSucceeded = true;
+ return size;
+}
+
+// Renames tmp file to final destination
+// Returns true if commit succeeded
+bool ProcessSession::ReadCallback::commit() {
+ bool success = false;
+
+ logger_->log_info("committing export operation to %s",
_destFile.c_str());
+
+ if (_writeSucceeded) {
+ _tmpFileOs.close();
+
+ if (rename(_tmpFile.c_str(), _destFile.c_str())) {
+ logger_->log_info("commit export operation to %s failed because
rename() call failed", _destFile.c_str());
+ } else {
+ success = true;
+ logger_->log_info("commit export operation to %s succeeded",
_destFile.c_str());
+ }
+ } else {
+ logger_->log_error("commit export operation to %s failed because write
failed", _destFile.c_str());
+ }
+ return success;
+}
+
+// Clean up resources
+ProcessSession::ReadCallback::~ReadCallback() {
+ // Close tmp file
+ _tmpFileOs.close();
+
+ // Clean up tmp file, if necessary
+ unlink(_tmpFile.c_str());
+}
+
+
+void ProcessSession::stash(const std::string &key,
std::shared_ptr<core::FlowFile> flow) {
--- End diff --
What are the ramifications if power is lost and the rocksdb repos WAL
causes us to repeat this flow? Would that previous tmp file be left around?
> Create FocusArchive processor
> -----------------------------
>
> Key: MINIFICPP-39
> URL: https://issues.apache.org/jira/browse/MINIFICPP-39
> Project: NiFi MiNiFi C++
> Issue Type: Task
> Reporter: Andrew Christianson
> Assignee: Andrew Christianson
> Priority: Minor
>
> Create an FocusArchive processor which implements a lens over an archive
> (tar, etc.). A concise, though informal, definition of a lens is as follows:
> "Essentially, they represent the act of “peering into” or “focusing in on”
> some particular piece/path of a complex data object such that you can more
> precisely target particular operations without losing the context or
> structure of the overall data you’re working with."
> https://medium.com/@dtipson/functional-lenses-d1aba9e52254#.hdgsvbraq
> Why an FocusArchive in MiNiFi? Simply put, it will enable us to "focus in on"
> an entry in the archive, perform processing *in-context* of that entry, then
> re-focus on the overall archive. This allows for transformation or other
> processing of an entry in the archive without losing the overall context of
> the archive.
> Initial format support is tar, due to its simplicity and ubiquity.
> Attributes:
> - Path (the path in the archive to focus; "/" to re-focus the overall archive)
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)