Github user minifirocks commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143551088
--- Diff: libminifi/include/processors/MergeContent.h ---
@@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin {
};
+// Archive Class
+class ArchiveMerge {
+public:
+ // Nest Callback Class for read stream
+ class ReadCallback: public InputStreamCallback {
+ public:
+ ReadCallback(uint64_t size, struct archive *arch, struct archive_entry
*entry) :
+ buffer_size_(size), arch_(arch), entry_(entry) {
+ }
+ ~ReadCallback() {
+ }
+ int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ uint8_t buffer[buffer_size_];
+ int64_t ret = 0;
+ uint64_t read_size;
+ ret = stream->read(buffer, buffer_size_);
+ if (!stream)
+ read_size = stream->getSize();
+ else
+ read_size = buffer_size_;
+ ret = archive_write_header(arch_, entry_);
+ ret += archive_write_data(arch_, buffer, read_size);
+ return ret;
+ }
+ uint64_t buffer_size_;
+ struct archive *arch_;
+ struct archive_entry *entry_;
+ };
+ // Nest Callback Class for write stream
+ class WriteCallback: public OutputStreamCallback {
+ public:
+ WriteCallback(std::string merge_type,
std::deque<std::shared_ptr<core::FlowFile>> &flows, core::ProcessSession
*session) :
+ merge_type_(merge_type), flows_(flows), session_(session) {
+ size_ = 0;
+ stream_ = nullptr;
+ }
+ ~WriteCallback() {
+ }
+
+ std::string merge_type_;
+ std::deque<std::shared_ptr<core::FlowFile>> &flows_;
+ core::ProcessSession *session_;
+ std::shared_ptr<io::BaseStream> stream_;
+ int64_t size_;
+
+ static la_ssize_t archive_write(struct archive *arch, void *context,
const void *buff, size_t size) {
+ WriteCallback *callback = (WriteCallback *) context;
+ la_ssize_t ret =
callback->stream_->write(reinterpret_cast<uint8_t*>(const_cast<void*>(buff)),
size);
+ if (ret > 0)
+ callback->size_ += (int64_t) ret;
+ return ret;
+ }
+
+ int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t ret = 0;
+ struct archive *arch;
+
+ arch = archive_write_new();
+ if (merge_type_ == MERGE_FORMAT_TAR_VALUE) {
+ archive_write_set_format_pax_restricted(arch); // tar format
+ }
+ if (merge_type_ == MERGE_FORMAT_ZIP_VALUE) {
+ archive_write_set_format_zip(arch); // zip format
+ }
+ archive_write_set_bytes_per_block(arch, 0);
+ archive_write_add_filter_none(arch);
+ this->stream_ = stream;
+ archive_write_open(arch, this, NULL, archive_write, NULL);
+
+ for (auto flow : flows_) {
+ struct archive_entry *entry = archive_entry_new();
+ std::string fileName;
+ flow->getAttribute(FlowAttributeKey(FILENAME), fileName);
+ archive_entry_set_pathname(entry, fileName.c_str());
+ archive_entry_set_size(entry, flow->getSize());
+ archive_entry_set_mode(entry, S_IFREG | 0755);
+ if (merge_type_ == MERGE_FORMAT_TAR_VALUE) {
+ std::string perm;
+ int permInt;
+ if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE,
perm)) {
+ try {
+ permInt = std::stoi(perm);
+ archive_entry_set_perm(entry, (mode_t) permInt);
+ } catch (...) {
--- End diff --
will do.
---