arpadboda commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r304290602
 
 

 ##########
 File path: nanofi/src/api/ecu.c
 ##########
 @@ -0,0 +1,266 @@
+#include "api/ecu.h"
+#include "api/nanofi.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include "core/flowfiles.h"
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <limits.h>
+#include <signal.h>
+#include <sys/stat.h>
+
+nifi_instance * instance = NULL;
+standalone_processor * proc = NULL;
+flow_file_list * ff_list = NULL;
+uint64_t curr_offset = 0;
+volatile sig_atomic_t stopped = 0;
+int flow_file_exists = 0;
+flow_file_record * ffr_delim = NULL;
+
+void signal_handler(int signum) {
+    if (signum == SIGINT || signum == SIGTERM) {
+        stopped = 1;
+    }
+}
+
+void set_offset(uint64_t offset) {
+    curr_offset = offset;
+}
+
+uint64_t get_offset() {
+    return curr_offset;
+}
+
+void on_trigger_tailfilechunk(processor_session * ps, processor_context * ctx) 
{
+    char file_path[4096];
+    char chunk_size[50];
+
+    free_flow_file_list(&ff_list);
+
+    if (get_property(ctx, "file_path", file_path, sizeof(file_path)) != 0) {
+        return;
+    }
+
+    if (get_property(ctx, "chunk_size", chunk_size, sizeof(chunk_size)) != 0) {
+        return;
+    }
+
+    errno = 0;
+    unsigned long chunk_size_value = strtoul(chunk_size, NULL, 10);
+
+    if (errno != 0) {
+        printf("Invalid chunk size specified\n");
+        return;
+    }
+
+    FILE * fp = fopen(file_path, "rb");
+
+    if (!fp) {
+        printf("Unable to open file. {file: %s, reason: %s}\n", file_path, 
strerror(errno));
+        return;
+    }
+
+    char * buff = (char *)malloc((chunk_size_value +1 ) * sizeof(char));
+    size_t bytes_read = 0;
+    fseek(fp, curr_offset, SEEK_SET);
+    while ((bytes_read = fread(buff, 1, chunk_size_value, fp)) > 0) {
+        if (bytes_read < chunk_size_value) {
+            break;
+        }
+        buff[chunk_size_value] = '\0';
+        flow_file_record * ffr = write_to_flow_file(buff, strlen(buff), 
instance, proc);
+        char offset_str[21];
+        snprintf(offset_str, sizeof(offset_str), "%llu", curr_offset);
+        add_attribute(ffr, "current offset", offset_str, strlen(offset_str));
+        add_flow_file_record(&ff_list, ffr);
+        curr_offset = ftell(fp);
+    }
+    free(buff);
+    fclose(fp);
+}
+
+flow_file_info log_aggregate(const char * file_path, char delim, uint64_t 
curr_offset) {
+    flow_file_info ff_info;
+    memset(&ff_info, 0, sizeof(ff_info));
+
+    if (!file_path) {
+        return ff_info;
+    }
+
+    char buff[MAX_BYTES_READ + 1];
+    errno = 0;
+    FILE * fp = fopen(file_path, "rb");
+    if (!fp) {
+        printf("Cannot open file: {file: %s, reason: %s}\n", file_path, 
strerror(errno));
+        return ff_info;
+    }
+    fseek(fp, curr_offset, SEEK_SET);
+
+    flow_file_list * ffl = NULL;
+    size_t bytes_read = 0;
+    while ((bytes_read = fread(buff, 1, MAX_BYTES_READ, fp)) > 0) {
+        buff[bytes_read] = '\0';
+        struct token_list tokens = tokenize_string_tailfile(buff, delim);
+        if (tokens.total_bytes > 0) {
+            curr_offset += tokens.total_bytes;
+            fseek(fp, curr_offset, SEEK_SET);
+        }
+
+        token_node * head;
+        for (head = tokens.head; head && head->data; head = head->next) {
+            flow_file_record * ffr = write_to_flow_file(head->data, 
strlen(head->data), instance, proc);
+            char offset[21] = {0};
 
 Review comment:
   Why 21?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to