[ 
https://issues.apache.org/jira/browse/METRON-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957145#comment-15957145
 ] 

ASF GitHub Bot commented on METRON-822:
---------------------------------------

Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/509#discussion_r109962549
  
    --- Diff: metron-sensors/fastcapa/src/kafka.c ---
    @@ -21,11 +21,113 @@
     #define POLL_TIMEOUT_MS 1000
     
     /*
    - * data structures required for the kafka client
    + * Passed to all callback functions to help identify the connection.
      */
    -static rd_kafka_t** kaf_h;
    -static rd_kafka_topic_t** kaf_top_h;
    -static int num_conns;
    +struct opaque {
    +    int conn_id;  
    +};
    +
    +/*
    + * Data structures required for the kafka client
    + */
    +static rd_kafka_t **kaf_h;
    +static rd_kafka_topic_t **kaf_top_h;
    +static unsigned num_conns;
    +static FILE *stats_fd;
    +static struct app_stats *kaf_conn_stats;
    +static struct opaque *kaf_opaque;
    +static uint64_t *kaf_keys;
    +
    +/*
    + * A callback executed when an error occurs within the kafka client
    + */
    +static void kaf_error_cb (rd_kafka_t *rk, int err, const char *reason, 
void* UNUSED(opaque))
    +{
    +    LOG_ERROR(USER1, "kafka client unexpected error; conn=%s, error=%s 
[%s] \n", 
    +        rd_kafka_name(rk), rd_kafka_err2str(err), reason);
    +}
    +
    +/*
    + * A callback executed when a broker throttles the producer
    + */
    +static void kaf_throttle_cb (rd_kafka_t *rk, const char *broker_name, 
int32_t broker_id, int throttle_time_ms, void* UNUSED(opaque))
    +{
    +    LOG_ERROR(USER1, "kafka client throttle event; conn=%s, time=%dms 
broker=%s broker_id=%"PRId32" \n", 
    +        rd_kafka_name(rk), throttle_time_ms, broker_name, broker_id);
    +}
    +
    +/*
    + * A callback executed on a fixed frequency (defined by 
`statistics.interval.ms`) 
    + * that provides detailed performance statistics
    + */
    +static int kaf_stats_cb(rd_kafka_t *rk, char *json, size_t 
UNUSED(json_len), void *opaque) 
    +{
    +    int rc;
    +    struct opaque *data = (struct opaque*) opaque;
    +    int conn_id = data->conn_id;   
    +
    +    // update queue depth of this kafka connection
    +    kaf_conn_stats[conn_id].depth = rd_kafka_outq_len(rk);
    +
    +    // TODO this should be handled by a logging lib that can handle faults 
and rolling the output file
    --- End diff --
    
    Created [METRON-828](https://issues.apache.org/jira/browse/METRON-828) to 
track this.


> Improve Fastcapa Performance
> ----------------------------
>
>                 Key: METRON-822
>                 URL: https://issues.apache.org/jira/browse/METRON-822
>             Project: Metron
>          Issue Type: Improvement
>            Reporter: Nick Allen
>            Assignee: Nick Allen
>
> Improve the performance and scalability of the Fastcapa probe.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to