[
https://issues.apache.org/jira/browse/MINIFICPP-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16211001#comment-16211001
]
ASF GitHub Bot commented on MINIFICPP-39:
-----------------------------------------
Github user phrocker commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/148#discussion_r145692128
--- Diff: libminifi/src/processors/FocusArchiveEntry.cpp ---
@@ -0,0 +1,340 @@
+/**
+ * @file FocusArchiveEntry.cpp
+ * FocusArchiveEntry class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "processors/FocusArchiveEntry.h"
+
+#include <archive.h>
+#include <archive_entry.h>
+
+#include <string.h>
+
+#include <boost/filesystem.hpp>
+
+#include <string>
+#include <set>
+
+#include <iostream>
+#include <fstream>
+#include <memory>
+
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+#include "json/json.h"
+#include "json/writer.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property FocusArchiveEntry::Path(
+ "Path",
+ "The path within the archive to focus (\"/\" to focus the total
archive)",
+ "");
+core::Relationship FocusArchiveEntry::Success(
+ "success",
+ "success operational on the flow record");
+
+bool
FocusArchiveEntry::set_del_or_update_attr(std::shared_ptr<core::FlowFile>
flowFile, const std::string key, std::string* value) const {
+ if (value == nullptr)
+ return flowFile->removeAttribute(key);
+ else if (flowFile->updateAttribute(key, *value))
+ return true;
+ else
+ return flowFile->addAttribute(key, *value);
+}
+
+void FocusArchiveEntry::initialize() {
+ //! Set the supported properties
+ std::set<core::Property> properties;
+ properties.insert(Path);
+ setSupportedProperties(properties);
+ //! Set the supported relationships
+ std::set<core::Relationship> relationships;
+ relationships.insert(Success);
+ setSupportedRelationships(relationships);
+}
+
+void FocusArchiveEntry::onTrigger(core::ProcessContext *context,
+ core::ProcessSession *session) {
+ auto flowFile = session->get();
+ std::shared_ptr<FlowFileRecord> flowFileRecord =
std::static_pointer_cast<FlowFileRecord>(flowFile);
+
+ if (!flowFile) {
+ return;
+ }
+
+ std::string targetEntry;
+ context->getProperty(Path.getName(), targetEntry);
+
+ // Extract archive contents
+ ArchiveMetadata archiveMetadata;
+ archiveMetadata.focusedEntry = targetEntry;
+ ReadCallback cb(&archiveMetadata);
+ session->read(flowFile, &cb);
+
+ // For each extracted entry, import & stash to key
+ std::string targetEntryStashKey;
+
+ for (auto &entryMetadata : archiveMetadata.entryMetadata) {
+ if (entryMetadata.entryType == AE_IFREG) {
+ logger_->log_info("FocusArchiveEntry importing %s from %s",
+ entryMetadata.entryName.c_str(),
+ entryMetadata.tmpFileName.c_str());
+ session->import(entryMetadata.tmpFileName, flowFile, false, 0);
+ char stashKey[37];
+ uuid_t stashKeyUuid;
+ uuid_generate(stashKeyUuid);
+ uuid_unparse_lower(stashKeyUuid, stashKey);
+ logger_->log_debug(
+ "FocusArchiveEntry generated stash key %s for entry %s",
+ stashKey,
+ entryMetadata.entryName.c_str());
+ entryMetadata.stashKey.assign(stashKey);
+
+ if (entryMetadata.entryName == targetEntry) {
+ targetEntryStashKey = entryMetadata.stashKey;
+ }
+
+ // Stash the content
+ session->stash(entryMetadata.stashKey, flowFile);
+ }
+ }
+
+ // Restore target archive entry
+ if (targetEntryStashKey != "") {
+ session->restore(targetEntryStashKey, flowFile);
+ } else {
+ logger_->log_warn(
+ "FocusArchiveEntry failed to locate target entry: %s",
+ targetEntry.c_str());
+ }
+
+ // Set new/updated lens stack to attribute
+ {
+ Json::Value lensStack;
+ Json::Reader reader;
+
+ std::string existingLensStack;
+
+ if (flowFile->getAttribute("lens.archive.stack", existingLensStack)) {
+ logger_->log_info("FocusArchiveEntry loading existing lens context");
+ if (!reader.parse(existingLensStack, lensStack)) {
+ logger_->log_error("FocusArchiveEntry JSON parse error: %s",
+ reader.getFormattedErrorMessages());
+ context->yield();
+ return;
+ }
+ } else {
+ lensStack = Json::Value(Json::arrayValue);
+ }
+
+ Json::Value structVal(Json::arrayValue);
+
+ for (const auto &entryMetadata : archiveMetadata.entryMetadata) {
+ Json::Value entryVal(Json::objectValue);
+ entryVal["entry_name"] = Json::Value(entryMetadata.entryName);
+ entryVal["entry_type"] = Json::Value(entryMetadata.entryType);
+ entryVal["entry_perm"] = Json::Value(entryMetadata.entryPerm);
+ entryVal["entry_size"] = Json::Value(entryMetadata.entrySize);
+ entryVal["entry_uid"] = Json::Value(entryMetadata.entryUID);
+ entryVal["entry_gid"] = Json::Value(entryMetadata.entryGID);
+ entryVal["entry_mtime"] = Json::Value(entryMetadata.entryMTime);
+ entryVal["entry_mtime_nsec"] =
Json::Value(entryMetadata.entryMTimeNsec);
+
+ if (entryMetadata.entryType == AE_IFREG) {
+ entryVal["stash_key"] = Json::Value(entryMetadata.stashKey);
+ }
+
+ structVal.append(entryVal);
+ }
+
+ std::string archivenameStr;
+ Json::Value archiveName {Json::nullValue};
+
+ if (flowFile->getAttribute("filename", archivenameStr)) {
+ archiveName = Json::Value(archivenameStr);
+ }
+
+ Json::Value lensVal(Json::objectValue);
+ lensVal["archive_format_name"] =
Json::Value(archiveMetadata.archiveFormatName);
+ lensVal["archive_name"] = archiveName;
+ lensVal["focused_entry"] = Json::Value(archiveMetadata.focusedEntry);
+ lensVal["archive_format"] = Json::Value(archiveMetadata.archiveFormat);
+ lensVal["archive_structure"] = structVal;
+ lensStack.append(lensVal);
+
+ Json::FastWriter writer;
+ std::string stackStr = writer.write(lensStack);
+
+ if (!flowFile->updateAttribute("lens.archive.stack", stackStr)) {
+ flowFile->addAttribute("lens.archive.stack", stackStr);
+ }
+ }
+
+ // Update filename attribute to that of focused entry
+ std::size_t found = targetEntry.find_last_of("/\\");
+ std::string path = targetEntry.substr(0, found);
+ std::string name = targetEntry.substr(found + 1);
+ set_del_or_update_attr(flowFile, "filename", &name);
+ set_del_or_update_attr(flowFile, "path", &path);
+ set_del_or_update_attr(flowFile, "absolute.path", &targetEntry);
+
+ // Transfer to the relationship
+ session->transfer(flowFile, Success);
+}
+
+typedef struct {
+ std::shared_ptr<io::BaseStream> stream;
+ char buf[8196];
+} FocusArchiveEntryReadData;
+
+int64_t
FocusArchiveEntry::ReadCallback::process(std::shared_ptr<io::BaseStream>
stream) {
+ auto inputArchive = archive_read_new();
+ struct archive_entry *entry;
+ int64_t nlen = 0;
+
+ FocusArchiveEntryReadData data;
+ data.stream = stream;
+
+ archive_read_support_format_all(inputArchive);
+ archive_read_support_filter_all(inputArchive);
+
+ // Read callback which reads from ifstream
+ auto read = [] (archive *, void *d, const void **buf) -> int64_t {
+ auto data = static_cast<FocusArchiveEntryReadData *>(d);
+ *buf = data->buf;
+ int64_t read = 0;
+ int64_t last_read = 0;
+
+ do {
+ last_read = data->stream->readData(reinterpret_cast<uint8_t
*>(data->buf), 8196 - read);
+ read += last_read;
+ } while (last_read > 0 && read < 8196);
+
+ return read;
+ };
+
+ // Close callback for libarchive
+ auto close = [] (archive *, void *) -> int {
+ // Because we do not need to close the stream, do nothing & return
success
+ return 0;
+ };
+
+ // Read each item in the archive
+ int res;
+
+ if ((res = archive_read_open(inputArchive, &data, NULL, read, close))) {
+ logger_->log_error(
+ "FocusArchiveEntry can't open due to archive error: %s",
+ archive_error_string(inputArchive));
+ return nlen;
+ }
+
+ for (;;) {
--- End diff --
^Same as above...isRunning() ?
> Create FocusArchive processor
> -----------------------------
>
> Key: MINIFICPP-39
> URL: https://issues.apache.org/jira/browse/MINIFICPP-39
> Project: NiFi MiNiFi C++
> Issue Type: Task
> Reporter: Andrew Christianson
> Assignee: Andrew Christianson
> Priority: Minor
>
> Create an FocusArchive processor which implements a lens over an archive
> (tar, etc.). A concise, though informal, definition of a lens is as follows:
> "Essentially, they represent the act of “peering into” or “focusing in on”
> some particular piece/path of a complex data object such that you can more
> precisely target particular operations without losing the context or
> structure of the overall data you’re working with."
> https://medium.com/@dtipson/functional-lenses-d1aba9e52254#.hdgsvbraq
> Why an FocusArchive in MiNiFi? Simply put, it will enable us to "focus in on"
> an entry in the archive, perform processing *in-context* of that entry, then
> re-focus on the overall archive. This allows for transformation or other
> processing of an entry in the archive without losing the overall context of
> the archive.
> Initial format support is tar, due to its simplicity and ubiquity.
> Attributes:
> - Path (the path in the archive to focus; "/" to re-focus the overall archive)
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)