bakaid commented on a change in pull request #674: Minificpp 1007 - ECU C2 integration. URL: https://github.com/apache/nifi-minifi-cpp/pull/674#discussion_r348631248
########## File path: nanofi/src/processors/file_input.c ########## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +#include <unistd.h> +#include <stdlib.h> +#include <stdint.h> +#include <stdio.h> +#include <errno.h> +#include <string.h> +#include <sys/stat.h> +#include <processors/file_input.h> + +void initialize_file_input(file_input_context_t * ctx) { + pthread_mutex_init(&ctx->stop_mutex, NULL); + pthread_cond_init(&ctx->stop_cond, NULL); +} + +void start_file_input(file_input_context_t * ctx) { + pthread_mutex_lock(&ctx->stop_mutex); + ctx->stop = 0; + pthread_mutex_unlock(&ctx->stop_mutex); +} + +void stop_file_input(file_input_context_t * ctx) { + pthread_mutex_lock(&ctx->stop_mutex); + ctx->stop = 1; + pthread_cond_broadcast(&ctx->stop_cond); + pthread_mutex_unlock(&ctx->stop_mutex); +} + +int validate_file_path(const char * file_path) { + if (!file_path) { + return -1; + } + struct stat stats; + int ret = stat(file_path, &stats); + + if (ret == -1) { + printf("Error occurred while getting file status {file: %s, error: %s}\n", file_path, strerror(errno)); + return -1; + } + + if (S_ISDIR(stats.st_mode)){ + printf("Error: %s is a directory!\n", file_path); + return -1; + } + return 0; +} + +int validate_file_delimiter(const char * delimiter_str, char * delim) { + if (!delimiter_str || strlen(delimiter_str) == 0) { + return -1; + } + + char delimiter[3]; + strncpy(delimiter, delimiter_str, 2); + delimiter[2] = '\0'; + *delim = delimiter[0]; + + if (*delim == '\\' && strlen(delimiter) > 1) { + switch (delimiter[1]) { + case 'r': + *delim = '\r'; + break; + case 'n': + *delim = '\n'; + break; + case 't': + *delim = '\t'; + break; + case '\\': + *delim = '\\'; + break; + } + } + return 0; +} + +int str_to_uint(const char * input_str, uint64_t * out) { + if (!input_str) { + return -1; + } + errno = 0; + *out = (uint64_t)(strtoul(input_str, NULL, 10)); + if (errno != 0) { + return -1; + } + return 0; +} + +int validate_file_properties(struct file_input_context * context) { + if (!context || !context->input_properties) { + return -1; + } + + properties_t * props = context->input_properties; + properties_t * el = NULL; + HASH_FIND_STR(props, "file_path", el); + if (!el) { + return -1; + } + char * file_path = el->value; + if (!file_path) { + return -1; + } + + properties_t * cs = NULL; + properties_t * dl = NULL; + HASH_FIND_STR(props, "chunk_size", cs); + HASH_FIND_STR(props, "delimiter", dl); + if (dl && cs) { + return -1; + } + + if (!dl && !cs) { + return -1; + } + + char * chunk_size_str = NULL; + char * delimiter = NULL; + + if (cs) { + chunk_size_str = cs->value; + } + if (dl) { + delimiter = dl->value; + } + + el = NULL; + HASH_FIND_STR(props, "tail_frequency", el); + if (!el) { + return -1; + } + + char * tail_frequency_str = el->value; + + uint64_t chunk_size_uint = 0; + uint64_t tail_frequency_uint = 0; + char delim = '\0'; + + if ((validate_file_path(file_path) < 0) + || (dl && validate_file_delimiter(delimiter, &delim) < 0) + || (cs && str_to_uint(chunk_size_str, &chunk_size_uint) < 0) + || (str_to_uint(tail_frequency_str, &tail_frequency_uint) < 0) + || (dl && delim == '\0')) { + return -1; + } + + //populate file input context with parameters + size_t file_path_len = strlen(file_path); + char * fp = context->file_path; + if (fp) free(fp); + context->file_path = (char *)malloc(file_path_len + 1); + strcpy(context->file_path, file_path); + + context->tail_frequency = tail_frequency_uint; + + if (cs) + context->chunk_size = chunk_size_uint; + if (dl) + context->delimiter = delim; + return 0; +} + +int set_file_input_property(file_input_context_t * ctx, const char * name, const char * value) { + return add_property(&ctx->input_properties, name, value); +} + +message_t * create_message(char * data, size_t len, properties_t * props) { + attribute_set as = prepare_attributes(props); + return prepare_message(data, len, as); +} + +void enqueue_chunk(file_input_context_t * ctx, data_buff_t chunk) { + if (chunk.len > 0 && chunk.data) { + size_t fp_len = strlen(ctx->file_path); + chunk.file_path = (char *)malloc(fp_len + 1); + strcpy(chunk.file_path, ctx->file_path); + + int length = snprintf(NULL, 0, "%llu", ctx->current_offset); + char * offset_str = (char *)malloc(length + 1); + snprintf(offset_str, length + 1, "%llu", ctx->current_offset); + + properties_t * props = NULL; + add_property(&props, "current offset", offset_str); + add_property(&props, "file path", chunk.file_path); + + message_t * msg = create_message(chunk.data, chunk.len, props); + free(chunk.data); + free(offset_str); + free(chunk.file_path); + free_properties(props); + size_t bytes_enqueued = enqueue_message(ctx->msg_queue, msg); + if (bytes_enqueued < chunk.len) { + //we were not able to enqueue all of chunk.len bytes + //therefore, update the current offset + ctx->current_offset -= (chunk.len - bytes_enqueued); + } + } +} + +void read_file_chunk(file_input_context_t * ctx) { + errno = 0; + FILE * fp = fopen(ctx->file_path, "rb"); + if (!fp) { + printf("Error opening file %s, error: %s\n", ctx->file_path, strerror(errno)); + return; + } + size_t bytes_read = 0; + char * data = (char *)malloc((ctx->chunk_size + 1) * sizeof(char)); + fseek(fp, ctx->current_offset, SEEK_SET); + while ((bytes_read = fread(data, 1, ctx->chunk_size, fp)) > 0) { + if (bytes_read < ctx->chunk_size) { + break; + } + data[ctx->chunk_size] = '\0'; + data_buff_t buff; + buff.data = (char *)malloc(strlen(data) + 1); + memcpy(buff.data, data, strlen(data) + 1); + buff.len = strlen(data); + ctx->current_offset = ftell(fp); + size_t old_offset = ctx->current_offset; + enqueue_chunk(ctx, buff); + if (old_offset > ctx->current_offset) { + //we were not able to ship all of the chunk read from + //the file. let's stop reading and come back later + break; + } + fseek(fp, ctx->current_offset, SEEK_SET); + } + free(data); + fclose(fp); +} + + +void read_file_delim(file_input_context_t * ctx) { + FILE * fp = fopen(ctx->file_path, "rb"); + errno = 0; + if (!fp) { + printf("Unable to open file. {file: %s, reason: %s}\n", ctx->file_path, strerror(errno)); + return; + } + + char data[4096 + 1]; + memset(data, 0, 4097); + fseek(fp, ctx->current_offset, SEEK_SET); + size_t bytes_read = 0; + while ((bytes_read = fread(data, 1, 4096, fp)) > 0) { + data[bytes_read] = '\0'; + const char * begin = data; + const char * end = NULL; + + while ((end = strchr(begin, ctx->delimiter))) { Review comment: If there is a `\0` in this file (which I think is valid, we could read a delimited binary file), you will exit on it. I suggest you don't treat the buffer as a C string and use `memchr` instead of `strchr`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
