szaszm commented on a change in pull request #674: Minificpp 1007 - ECU C2 
integration.
URL: https://github.com/apache/nifi-minifi-cpp/pull/674#discussion_r371920476
 
 

 ##########
 File path: nanofi/src/processors/file_input.c
 ##########
 @@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+    *
+    *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#ifndef WIN32
+#include <unistd.h>
+#endif
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <core/string_utils.h>
+#include <core/log.h>
+#include <processors/file_input.h>
+
+void initialize_file_input(file_input_context_t * ctx) {
+    initialize_lock(&ctx->stop_mutex);
+    initialize_cv(&ctx->stop_cond, NULL);
+}
+
+void start_file_input(file_input_context_t * ctx) {
+    acquire_lock(&ctx->stop_mutex);
+    ctx->stop = 0;
+    release_lock(&ctx->stop_mutex);
+}
+
+void stop_file_input(file_input_context_t * ctx) {
+       acquire_lock(&ctx->stop_mutex);
+    ctx->stop = 1;
+    condition_variable_broadcast(&ctx->stop_cond);
+    release_lock(&ctx->stop_mutex);
+}
+
+#ifndef WIN32
+int validate_file_path(const char * file_path) {
+    if (!file_path) {
+        return -1;
+    }
+
+       struct stat stats;
+       int ret = stat(file_path, &stats);
+
+    if (ret == -1) {
+        logc(err, "Error occurred while getting file status {file: %s, error: 
%s}\n", file_path, strerror(errno));
+        return -1;
+    }
+
+    if (S_ISDIR(stats.st_mode)){
+        logc(err, "Error: %s is a directory!\n", file_path);
+        return -1;
+    }
+    return 0;
+}
+#else
+int validate_file_path(const char * file_path) {
+    if (!file_path) {
+        return -1;
+    }
+    HANDLE hFind;
+    WIN32_FIND_DATA fd;
+
+    hFind = FindFirstFile(file_path, &fd);
+    if (hFind == INVALID_HANDLE_VALUE || fd.dwFileAttributes == 
FILE_ATTRIBUTE_DIRECTORY) {
+        return -1;
+    }
+
+    return 0;
+}
+#endif
+
+int validate_file_delimiter(const char * delimiter_str, char * delim) {
+    if (!delimiter_str || strlen(delimiter_str) == 0) {
+        return -1;
+    }
+
+    char delimiter[3];
+    strncpy(delimiter, delimiter_str, 2);
+    delimiter[2] = '\0';
+    *delim = delimiter[0];
+
+    if (*delim == '\\' && strlen(delimiter) > 1) {
+        switch (delimiter[1]) {
+        case 'r':
+            *delim = '\r';
+            break;
+        case 'n':
+            *delim = '\n';
+            break;
+        case 't':
+            *delim = '\t';
+            break;
+        case '\\':
+            *delim = '\\';
+            break;
+        }
+    }
+    return 0;
+}
+
+int validate_file_properties(struct file_input_context * context) {
+    if (!context || !context->input_properties) {
+        return -1;
+    }
+
+    properties_t * props = context->input_properties;
+    properties_t * el = NULL;
+    HASH_FIND_STR(props, "file_path", el);
+    if (!el) {
+        return -1;
+    }
+    char * file_path = el->value;
+    if (!file_path) {
+        return -1;
+    }
+
+    properties_t * cs = NULL;
+    properties_t * dl = NULL;
+    HASH_FIND_STR(props, "chunk_size_bytes", cs);
+    HASH_FIND_STR(props, "delimiter", dl);
+    if (dl && cs) {
+        return -1;
+    }
+
+    if (!dl && !cs) {
+        return -1;
+    }
+
+    char * chunk_size_str = NULL;
+    char * delimiter = NULL;
+
+    if (cs) {
+        chunk_size_str = cs->value;
+    }
+    if (dl) {
+        delimiter = dl->value;
+    }
+
+    el = NULL;
+    HASH_FIND_STR(props, "tail_frequency_ms", el);
+    if (!el) {
+        return -1;
+    }
+
+    char * tail_frequency_str = el->value;
+
+    uint64_t chunk_size_uint = 0;
+    uint64_t tail_frequency_uint = 0;
+    char delim = '\0';
+
+    if ((validate_file_path(file_path) < 0)
+        || (dl && validate_file_delimiter(delimiter, &delim) < 0)
+        || (cs && str_to_uint(chunk_size_str, &chunk_size_uint) < 0)
+        || (str_to_uint(tail_frequency_str, &tail_frequency_uint) < 0)
+        || (dl && delim == '\0')) {
+        return -1;
+    }
+
+    //populate file input context with parameters
+    size_t file_path_len = strlen(file_path);
+    char * fp = context->file_path;
+    if (fp) free(fp);
+    context->file_path = (char *)malloc(file_path_len + 1);
+    strcpy(context->file_path, file_path);
+
+    context->tail_frequency_ms = tail_frequency_uint;
+
+    if (cs)
+     context->chunk_size = chunk_size_uint;
+    if (dl)
+        context->delimiter = delim;
+    return 0;
+}
+
+int set_file_input_property(file_input_context_t * ctx, const char * name, 
const char * value) {
+    return add_property(&ctx->input_properties, name, value);
+}
+
+message_t * create_message(char * data, size_t len, properties_t * props) {
+    attribute_set as = prepare_attributes(props);
+    return prepare_message(data, len, as);
+}
+
+size_t enqueue_chunk(file_input_context_t * ctx, data_buff_t chunk) {
+       size_t bytes_enqueued = 0;
+    if (chunk.len > 0 && chunk.data) {
+        size_t fp_len = strlen(ctx->file_path);
+        chunk.file_path = (char *)malloc(fp_len + 1);
+        strcpy(chunk.file_path, ctx->file_path);
+
+        int length = snprintf(NULL, 0, "%llu", ctx->current_offset);
+        char * offset_str = (char *)malloc(length + 1);
+        snprintf(offset_str, length + 1, "%llu", ctx->current_offset);
+
+        properties_t * props = NULL;
+        add_property(&props, "current offset", offset_str);
+        add_property(&props, "file path", chunk.file_path);
+
+        message_t * msg = create_message(chunk.data, chunk.len, props);
+        free(chunk.data);
+        free(offset_str);
+        free(chunk.file_path);
+        free_properties(props);
+        bytes_enqueued = enqueue_message(ctx->msg_queue, msg);
+        if (bytes_enqueued < chunk.len) {
+            //we were not able to enqueue all of chunk.len bytes
+            //therefore, update the current offset
+            ctx->current_offset -= (chunk.len - bytes_enqueued);
+        }
+    }
+    return bytes_enqueued;
+}
+
+void read_file_chunk(file_input_context_t * ctx) {
+    errno = 0;
+    FILE * fp = fopen(ctx->file_path, "rb");
+    if (!fp) {
+        logc(err, "Error opening file %s, error: %s\n", ctx->file_path, 
strerror(errno));
+        return;
+    }
+    size_t bytes_read = 0;
+    char * data = (char *)malloc((ctx->chunk_size) * sizeof(char));
+    fseek(fp, ctx->current_offset, SEEK_SET);
+    while ((bytes_read = fread(data, 1, ctx->chunk_size, fp)) > 0) {
+        if (bytes_read < ctx->chunk_size) {
+            break;
+        }
 
 Review comment:
   If we reject incomplete chunks, checking for `fread(data, 1, 
ctx->chunk_size, fp) == ctx->chunk_size` in the `while` condition would result 
in the same behavior but with less code, and `bytes_read` would be redundant.
   Do we really want to reject incomplete chunks?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to