Hi Team,

I'm trying to write a program which uses which uses in libcurl handles in
multithreaded manner. i.e transfer/make a http post request every 10 secs
using the libh2o event-loop powered by libuv worker threads. But what i see
is when I clear the handle once done with the request. I see that memory
footprint of the process increases in top. is there any better approach
sharing the sample code for review

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <time.h>

#include <curl/curl.h>
#include <uv.h>
#include <jansson.h>

#include "sr71_logging.h"
#include "metrics_common.h"
#include "atomic_common.h"

/* ============================================================
GLOBAL STATE
============================================================ */

bool metrics_publish = false;

int stats_interval = 5000;
char opt_metrics_endp[512] = {0};
static sr71_timer_callback stats_timer_handler;

/* single-flight guards */

static int metrics_work_in_progress = 0;
static int token_refresh_in_progress = 0;

/* shared OAuth token cache */

static char *metrics_token = NULL;
static long metrics_token_expiry = 0;

static uv_timer_t *metrics_timer = NULL;
static uv_timer_t *midnight_timer = NULL;
static uv_timer_t *third_timer = NULL;

/* ============================================================
THREAD LOCAL CURL STATE
============================================================ */

static __thread CURL *curl_auth = NULL;
static __thread CURL *curl_metrics = NULL;

static __thread struct curl_slist *headers_auth = NULL;
static __thread struct curl_slist *headers_metrics = NULL;

/* ============================================================
CURL GLOBAL INIT
============================================================ */

void initialize_metrics_common(void)
{
curl_global_init(CURL_GLOBAL_DEFAULT);
}

/* ============================================================
THREAD CURL INIT
============================================================ */

static void initialize_thread_curl(void)
{
if (!curl_auth)
curl_auth = curl_easy_init();

if (!curl_metrics)
{
    curl_metrics = curl_easy_init();

    /* persistent HTTP connection */

    curl_easy_setopt(curl_metrics, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);

    /* TCP keep alive */

    curl_easy_setopt(curl_metrics, CURLOPT_TCP_KEEPALIVE, 1L);
    curl_easy_setopt(curl_metrics, CURLOPT_TCP_KEEPIDLE, 30L);
    curl_easy_setopt(curl_metrics, CURLOPT_TCP_KEEPINTVL, 15L);
}

}

/* ============================================================
THREAD CURL CLEANUP
============================================================ */

static void cleanup_thread_curl(void)
{
if (curl_auth)
{
curl_easy_cleanup(curl_auth);
curl_auth = NULL;
}

if (curl_metrics)
{
    curl_easy_cleanup(curl_metrics);
    curl_metrics = NULL;
}

if (headers_auth)
{
    curl_slist_free_all(headers_auth);
    headers_auth = NULL;
}

if (headers_metrics)
{
    curl_slist_free_all(headers_metrics);
    headers_metrics = NULL;
}

if (metrics_token)
{
    free(metrics_token);
    metrics_token = NULL;
}

}

// Function to cleanup CURL resources at the end of the process
void cleanup_curl_resources() {

LOG_DEBUG("cleaned up curl resources using %s.. sleeping for 2 secs to
perform graceful cleanup", __func__);

/**
if (curl_auth) {
    curl_easy_cleanup(curl_auth);
    curl_auth = NULL;
}
if (curl_metrics) {
    curl_easy_cleanup(curl_metrics);
    curl_metrics = NULL;
}
//Comment out below if block if needed to use curl_easy_perform()
if (multi_handle) {
    curl_multi_cleanup(multi_handle);
    multi_handle = NULL;
}
if (headers_auth) {
    curl_slist_free_all(headers_auth);
    headers_auth = NULL;
}
if (headers_metrics) {
    curl_slist_free_all(headers_metrics);
    headers_metrics = NULL;
}

if (global_bearer_token != NULL) {
    free(global_bearer_token);
    global_bearer_token = NULL;
}
**/
if (metrics_timer)
{
    sr71_uv_timer_stop(metrics_timer);
    metrics_timer = NULL;
}

if (midnight_timer)
{
    sr71_uv_timer_stop(midnight_timer);
    midnight_timer = NULL;
}

if (third_timer)
{
    sr71_uv_timer_stop(third_timer);
    third_timer = NULL;
}

cleanup_thread_curl();
//sleep(2);
curl_global_cleanup();  // Calling at the end of program

}
/* ============================================================
TOKEN EXPIRY CHECK
============================================================ */

static int token_expired(void)
{
long now = time(NULL);

if (!metrics_token)
    return 1;

if (now >= metrics_token_expiry)
    return 1;

return 0;

}

/* ============================================================
AUTH RESPONSE CALLBACK
============================================================ */

static size_t write_callback(void *ptr, size_t size, size_t nmemb, void
*userdata)
{
size_t total = size * nmemb;

json_error_t error;

json_t *json = json_loadb(ptr, total, 0, &error);

if (!json)
{
    LOG_ERROR("Failed to parse OAuth JSON response");
    return total;
}

json_t *token = json_object_get(json, "access_token");
json_t *expires = json_object_get(json, "expires_in");

if (json_is_string(token) && json_is_integer(expires))
{
    const char *value = json_string_value(token);
    long expires_in = json_integer_value(expires);

    if (metrics_token)
        free(metrics_token);

    metrics_token = strdup(value);

    /* refresh 60 seconds before expiry */

    metrics_token_expiry = time(NULL) + expires_in - 60;

    LOG_INFO("OAuth token refreshed successfully. expires_in=%ld", expires_in);
}

json_decref(json);

return total;

}

/* ============================================================
FETCH AUTH TOKEN
============================================================ */

static bool fetch_auth_token(void)
{
if (atomic_incr(&token_refresh_in_progress, 1) != 1)
{
atomic_incr(&token_refresh_in_progress, -1);
LOG_DEBUG("Token refresh already in progress");
return true;
}

const char *tenant = getenv("TENANT");
const char *client = getenv("CLIENT_ID");
const char *secret = getenv("CLIENT_SECRET");

if (!tenant || !client || !secret)
{
    LOG_ERROR("OAuth environment variables missing");
    atomic_incr(&token_refresh_in_progress, -1);
    return false;
}

char auth_url[256];
char auth_data[1024];

snprintf(auth_url,
         sizeof(auth_url),
         "https://login.microsoftonline.com/%s/oauth2/v2.0/token";,
         tenant);

snprintf(auth_data,
         sizeof(auth_data),
         
"grant_type=client_credentials&client_id=%s&client_secret=%s&scope=https://monitor.azure.com/.default";,
         client,
         secret);

curl_easy_reset(curl_auth);

headers_auth = curl_slist_append(NULL,
    "Content-Type: application/x-www-form-urlencoded");

curl_easy_setopt(curl_auth, CURLOPT_URL, auth_url);
curl_easy_setopt(curl_auth, CURLOPT_POSTFIELDS, auth_data);
curl_easy_setopt(curl_auth, CURLOPT_HTTPHEADER, headers_auth);

curl_easy_setopt(curl_auth, CURLOPT_WRITEFUNCTION, write_callback);

curl_easy_setopt(curl_auth, CURLOPT_TIMEOUT_MS, 2000);
curl_easy_setopt(curl_auth, CURLOPT_CONNECTTIMEOUT_MS, 1500);

CURLcode res = curl_easy_perform(curl_auth);

long responseCode = 0;
curl_easy_getinfo(curl_auth, CURLINFO_RESPONSE_CODE, &responseCode);

curl_slist_free_all(headers_auth);
headers_auth = NULL;

atomic_incr(&token_refresh_in_progress, -1);

if (res != CURLE_OK || responseCode != 200)
{
    LOG_ERROR("OAuth request failed curl=%d http=%ld", res, responseCode);
    return false;
}

return true;

}

/* ============================================================
POST METRICS
============================================================ */

static bool post_metrics(char *json_str)
{
if (!metrics_token)
{
LOG_ERROR("Metrics token missing");
return false;
}

curl_easy_reset(curl_metrics);

headers_metrics = NULL;

headers_metrics = curl_slist_append(headers_metrics,
    "Content-Type: application/json");

char auth_header[2048];

snprintf(auth_header,
         sizeof(auth_header),
         "Authorization: Bearer %s",
         metrics_token);

headers_metrics = curl_slist_append(headers_metrics,
    auth_header);

curl_easy_setopt(curl_metrics, CURLOPT_URL, opt_metrics_endp);
curl_easy_setopt(curl_metrics, CURLOPT_POSTFIELDS, json_str);
curl_easy_setopt(curl_metrics, CURLOPT_HTTPHEADER, headers_metrics);

curl_easy_setopt(curl_metrics, CURLOPT_TIMEOUT_MS, 2000);
curl_easy_setopt(curl_metrics, CURLOPT_CONNECTTIMEOUT_MS, 1500);

CURLcode res = curl_easy_perform(curl_metrics);

long responseCode = 0;

curl_easy_getinfo(curl_metrics, CURLINFO_RESPONSE_CODE, &responseCode);

curl_slist_free_all(headers_metrics);
headers_metrics = NULL;

if (res != CURLE_OK)
{
    LOG_ERROR("Metrics POST failed: curl_error=%d (%s)",
              res,
              curl_easy_strerror(res));

    return false;
}

if (responseCode >= 200 && responseCode < 300)
{
    LOG_DEBUG("Metrics POST successful. HTTP=%ld", responseCode);
    return true;
}

LOG_WARN("Metrics POST returned HTTP=%ld", responseCode);

return false;

}

/* ============================================================
WORKER THREAD
============================================================ */

static void stats_worker_handler(uv_work_t *req)
{
if (!metrics_publish)
return;

initialize_thread_curl();

prepare_json_callback callback =
    (prepare_json_callback)req->data;

if (!callback)
    return;

char *json_str = callback();

if (!json_str)
{
    LOG_ERROR("Failed to generate metrics JSON");
    atomic_incr(&metrics_work_in_progress, -1);
    return;
}

/* proactive token refresh */

if (token_expired())
{
    LOG_DEBUG("OAuth token expired or near expiry");

    if (!fetch_auth_token())
    {
        LOG_ERROR("Token refresh failed");

        free(json_str);
        atomic_incr(&metrics_work_in_progress, -1);
        return;
    }
}

if (!post_metrics(json_str))
{
   LOG_ERROR("Metrics publish failed");
}
else
{
    LOG_DEBUG("Metrics published successfully");
}

free(json_str);

cleanup_thread_curl();
atomic_incr(&metrics_work_in_progress, -1);

}

/* ============================================================
TIMER CALLBACK
============================================================ */

static void stats_timer_handler(void *cb_data)
{
if (!metrics_publish)
return;

if (atomic_incr(&metrics_work_in_progress, 1) != 1)
{
    atomic_incr(&metrics_work_in_progress, -1);
    LOG_WARN("Skipping metrics run: previous run still active");
    return;
}

uv_work_t req;

req.data = cb_data;

sr71_uv_queue_work(
    &req,
    sizeof(req),
    (sr71_queue_handler *)stats_worker_handler
);

}

static void midnight_reset_handler(void *arg)
{
LOG_DEBUG("Performing midnight reset of metrics (UTC)");

if (process_array_stats_enabled)
    shares_process_stats_arr_reset(&process_stats_array);
else
    shares_process_stats_reset(&process_stats);

if (dbconn != NULL)
    shares_process_stats_reset_db();
else
    LOG_ERROR("Metrics DB reset failed: dbconn NULL");

/* schedule next reset in 24 hours */

third_timer = sr71_uv_timer_start(
    midnight_reset_handler,
    24 * 3600 * 1000,
    0,
    NULL
);

}

static int seconds_until_midnight_utc()
{
time_t now = time(NULL);

struct tm utc;
gmtime_r(&now, &utc);

int seconds_now =
    utc.tm_hour * 3600 +
    utc.tm_min  * 60 +
    utc.tm_sec;

int seconds_midnight = 24 * 3600;

int remaining = seconds_midnight - seconds_now;

LOG_INFO(
    "UTC time %02d:%02d:%02d | seconds_until_midnight=%d",
    utc.tm_hour,
    utc.tm_min,
    utc.tm_sec,
    remaining
);

return remaining;

}

static void schedule_midnight_reset(void)
{
int delay = seconds_until_midnight_utc();

LOG_DEBUG("Scheduling midnight reset in %d seconds", delay);

midnight_timer = sr71_uv_timer_start(
    midnight_reset_handler,
    delay * 1000,
    24 * 3600 * 1000,
    NULL
);

}

/* ============================================================
PUBLIC ENTRY
============================================================ */

int open_metrics_enpoint(char *metrics_ep,
prepare_json_callback callback)
{
if (!metrics_ep || !callback)
{
LOG_ERROR("Invalid metrics endpoint or callback");
return -1;
}

snprintf(opt_metrics_endp,
         sizeof(opt_metrics_endp),
         "%s",
         metrics_ep);

opt_metrics_endp[sizeof(opt_metrics_endp) - 1] = '\0';

LOG_INFO("Metrics endpoint configured: %s", opt_metrics_endp);

metrics_timer = sr71_uv_timer_start(
    stats_timer_handler,
    stats_interval,
    stats_interval,
    (void *)callback
);

return 0;

}
Regards,
Sasmit Utkarsh
+91-7674022625
-- 
Unsubscribe: https://lists.haxx.se/mailman/listinfo/curl-library
Etiquette:   https://curl.se/mail/etiquette.html

Reply via email to