Github user achristianson commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/148#discussion_r145743112
  
    --- Diff: libminifi/src/core/ProcessSession.cpp ---
    @@ -799,6 +799,152 @@ void ProcessSession::import(std::string source, 
std::shared_ptr<core::FlowFile>
       }
     }
     
    +bool ProcessSession::exportContent(
    +    const std::string &destination,
    +    const std::string &tmpFile,
    +    std::shared_ptr<core::FlowFile> &flow,
    +    bool keepContent) {
    +  logger_->log_info(
    +      "Exporting content of %s to %s",
    +      flow->getUUIDStr().c_str(),
    +      destination.c_str());
    +
    +  ReadCallback cb(tmpFile, destination, logger_);
    +  read(flow, &cb);
    +
    +  logger_->log_info("Committing %s", destination.c_str());
    +  bool commit_ok = cb.commit();
    +
    +  if (commit_ok) {
    +    logger_->log_info("Commit OK.");
    +  } else {
    +    logger_->log_error(
    +      "Commit of %s to %s failed!",
    +      flow->getUUIDStr().c_str(),
    +      destination.c_str());
    +  }
    +  return commit_ok;
    +}
    +
    +bool ProcessSession::exportContent(
    +    const std::string &destination,
    +    std::shared_ptr<core::FlowFile> &flow,
    +    bool keepContent) {
    +  std::string tmpFileName = boost::filesystem::unique_path().native();
    +  return exportContent(destination, tmpFileName, flow, keepContent);
    +}
    +
    +ProcessSession::ReadCallback::ReadCallback(const std::string &tmpFile,
    +                                           const std::string &destFile,
    +                                           
std::shared_ptr<logging::Logger> logger)
    +    : _tmpFile(tmpFile),
    +      _tmpFileOs(tmpFile, std::ios::binary),
    +      _destFile(destFile),
    +      logger_(logger) {
    +}
    +
    +// Copy the entire file contents to the temporary file
    +int64_t 
ProcessSession::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
    +  // Copy file contents into tmp file
    +  _writeSucceeded = false;
    +  size_t size = 0;
    +  uint8_t buffer[8192];
    +  do {
    +    int read = stream->read(buffer, 8192);
    +    if (read < 0) {
    +      return -1;
    +    }
    +    if (read == 0) {
    +      break;
    +    }
    +    _tmpFileOs.write(reinterpret_cast<char*>(buffer), read);
    +    size += read;
    +  } while (size < stream->getSize());
    +  _writeSucceeded = true;
    +  return size;
    +}
    +
    +// Renames tmp file to final destination
    +// Returns true if commit succeeded
    +bool ProcessSession::ReadCallback::commit() {
    +  bool success = false;
    +
    +  logger_->log_info("committing export operation to %s", 
_destFile.c_str());
    +
    +  if (_writeSucceeded) {
    +    _tmpFileOs.close();
    +
    +    if (rename(_tmpFile.c_str(), _destFile.c_str())) {
    +      logger_->log_info("commit export operation to %s failed because 
rename() call failed", _destFile.c_str());
    +    } else {
    +      success = true;
    +      logger_->log_info("commit export operation to %s succeeded", 
_destFile.c_str());
    +    }
    +  } else {
    +    logger_->log_error("commit export operation to %s failed because write 
failed", _destFile.c_str());
    +  }
    +  return success;
    +}
    +
    +// Clean up resources
    +ProcessSession::ReadCallback::~ReadCallback() {
    +  // Close tmp file
    +  _tmpFileOs.close();
    +
    +  // Clean up tmp file, if necessary
    +  unlink(_tmpFile.c_str());
    +}
    +
    +
    +void ProcessSession::stash(const std::string &key, 
std::shared_ptr<core::FlowFile> flow) {
    --- End diff --
    
    @phrocker by 'tmp file,' are you referring to the stash claims?
    
    We would want those to operate the same as the primary content claim. It 
looks like this may not be the case currently. Would adding the stash claims to 
the data stored/retrieved in FlowFileRecord's Serialize/Deserialize cover all 
the bases, or would other changes be required as well?


---

Reply via email to