[ 
https://issues.apache.org/jira/browse/MINIFICPP-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16211262#comment-16211262
 ] 

ASF GitHub Bot commented on MINIFICPP-39:
-----------------------------------------

Github user achristianson commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/148#discussion_r145743112
  
    --- Diff: libminifi/src/core/ProcessSession.cpp ---
    @@ -799,6 +799,152 @@ void ProcessSession::import(std::string source, 
std::shared_ptr<core::FlowFile>
       }
     }
     
    +bool ProcessSession::exportContent(
    +    const std::string &destination,
    +    const std::string &tmpFile,
    +    std::shared_ptr<core::FlowFile> &flow,
    +    bool keepContent) {
    +  logger_->log_info(
    +      "Exporting content of %s to %s",
    +      flow->getUUIDStr().c_str(),
    +      destination.c_str());
    +
    +  ReadCallback cb(tmpFile, destination, logger_);
    +  read(flow, &cb);
    +
    +  logger_->log_info("Committing %s", destination.c_str());
    +  bool commit_ok = cb.commit();
    +
    +  if (commit_ok) {
    +    logger_->log_info("Commit OK.");
    +  } else {
    +    logger_->log_error(
    +      "Commit of %s to %s failed!",
    +      flow->getUUIDStr().c_str(),
    +      destination.c_str());
    +  }
    +  return commit_ok;
    +}
    +
    +bool ProcessSession::exportContent(
    +    const std::string &destination,
    +    std::shared_ptr<core::FlowFile> &flow,
    +    bool keepContent) {
    +  std::string tmpFileName = boost::filesystem::unique_path().native();
    +  return exportContent(destination, tmpFileName, flow, keepContent);
    +}
    +
    +ProcessSession::ReadCallback::ReadCallback(const std::string &tmpFile,
    +                                           const std::string &destFile,
    +                                           
std::shared_ptr<logging::Logger> logger)
    +    : _tmpFile(tmpFile),
    +      _tmpFileOs(tmpFile, std::ios::binary),
    +      _destFile(destFile),
    +      logger_(logger) {
    +}
    +
    +// Copy the entire file contents to the temporary file
    +int64_t 
ProcessSession::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
    +  // Copy file contents into tmp file
    +  _writeSucceeded = false;
    +  size_t size = 0;
    +  uint8_t buffer[8192];
    +  do {
    +    int read = stream->read(buffer, 8192);
    +    if (read < 0) {
    +      return -1;
    +    }
    +    if (read == 0) {
    +      break;
    +    }
    +    _tmpFileOs.write(reinterpret_cast<char*>(buffer), read);
    +    size += read;
    +  } while (size < stream->getSize());
    +  _writeSucceeded = true;
    +  return size;
    +}
    +
    +// Renames tmp file to final destination
    +// Returns true if commit succeeded
    +bool ProcessSession::ReadCallback::commit() {
    +  bool success = false;
    +
    +  logger_->log_info("committing export operation to %s", 
_destFile.c_str());
    +
    +  if (_writeSucceeded) {
    +    _tmpFileOs.close();
    +
    +    if (rename(_tmpFile.c_str(), _destFile.c_str())) {
    +      logger_->log_info("commit export operation to %s failed because 
rename() call failed", _destFile.c_str());
    +    } else {
    +      success = true;
    +      logger_->log_info("commit export operation to %s succeeded", 
_destFile.c_str());
    +    }
    +  } else {
    +    logger_->log_error("commit export operation to %s failed because write 
failed", _destFile.c_str());
    +  }
    +  return success;
    +}
    +
    +// Clean up resources
    +ProcessSession::ReadCallback::~ReadCallback() {
    +  // Close tmp file
    +  _tmpFileOs.close();
    +
    +  // Clean up tmp file, if necessary
    +  unlink(_tmpFile.c_str());
    +}
    +
    +
    +void ProcessSession::stash(const std::string &key, 
std::shared_ptr<core::FlowFile> flow) {
    --- End diff --
    
    @phrocker by 'tmp file,' are you referring to the stash claims?
    
    We would want those to operate the same as the primary content claim. It 
looks like this may not be the case currently. Would adding the stash claims to 
the data stored/retrieved in FlowFileRecord's Serialize/Deserialize cover all 
the bases, or would other changes be required as well?


> Create FocusArchive processor
> -----------------------------
>
>                 Key: MINIFICPP-39
>                 URL: https://issues.apache.org/jira/browse/MINIFICPP-39
>             Project: NiFi MiNiFi C++
>          Issue Type: Task
>            Reporter: Andrew Christianson
>            Assignee: Andrew Christianson
>            Priority: Minor
>
> Create an FocusArchive processor which implements a lens over an archive 
> (tar, etc.). A concise, though informal, definition of a lens is as follows:
> "Essentially, they represent the act of “peering into” or “focusing in on” 
> some particular piece/path of a complex data object such that you can more 
> precisely target particular operations without losing the context or 
> structure of the overall data you’re working with." 
> https://medium.com/@dtipson/functional-lenses-d1aba9e52254#.hdgsvbraq
> Why an FocusArchive in MiNiFi? Simply put, it will enable us to "focus in on" 
> an entry in the archive, perform processing *in-context* of that entry, then 
> re-focus on the overall archive. This allows for transformation or other 
> processing of an entry in the archive without losing the overall context of 
> the archive.
> Initial format support is tar, due to its simplicity and ubiquity.
> Attributes:
> - Path (the path in the archive to focus; "/" to re-focus the overall archive)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to