Script 'mail_helper' called by obssrc Hello community, here is the log from the commit of package aws-c-mqtt for openSUSE:Factory checked in at 2026-03-20 21:20:18 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/aws-c-mqtt (Old) and /work/SRC/openSUSE:Factory/.aws-c-mqtt.new.8177 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "aws-c-mqtt" Fri Mar 20 21:20:18 2026 rev:16 rq:1341438 version:0.15.0 Changes: -------- --- /work/SRC/openSUSE:Factory/aws-c-mqtt/aws-c-mqtt.changes 2026-02-23 16:14:58.747344560 +0100 +++ /work/SRC/openSUSE:Factory/.aws-c-mqtt.new.8177/aws-c-mqtt.changes 2026-03-20 21:20:54.524251398 +0100 @@ -1,0 +2,6 @@ +Tue Mar 17 13:24:58 UTC 2026 - John Paul Adrian Glaubitz <[email protected]> + +- Update to version 0.15.0 + * Manual Puback for MQTT5 by @sbSteveK in (#417) + +------------------------------------------------------------------- Old: ---- v0.14.0.tar.gz New: ---- v0.15.0.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ aws-c-mqtt.spec ++++++ --- /var/tmp/diff_new_pack.JjpySl/_old 2026-03-20 21:20:55.200279563 +0100 +++ /var/tmp/diff_new_pack.JjpySl/_new 2026-03-20 21:20:55.200279563 +0100 @@ -18,7 +18,7 @@ %global library_version 1_0_0 Name: aws-c-mqtt -Version: 0.14.0 +Version: 0.15.0 Release: 0 Summary: AWS C99 implementation of the MQTT 3.1.1 specification License: Apache-2.0 ++++++ v0.14.0.tar.gz -> v0.15.0.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/aws-c-mqtt-0.14.0/include/aws/mqtt/private/client_impl_shared.h new/aws-c-mqtt-0.15.0/include/aws/mqtt/private/client_impl_shared.h --- old/aws-c-mqtt-0.14.0/include/aws/mqtt/private/client_impl_shared.h 2026-02-17 18:31:31.000000000 +0100 +++ new/aws-c-mqtt-0.15.0/include/aws/mqtt/private/client_impl_shared.h 2026-03-11 19:20:15.000000000 +0100 @@ -141,6 +141,10 @@ AWS_MQTT_API bool aws_mqtt_compare_uint16_t_eq(const void *a, const void *b); +AWS_MQTT_API uint64_t aws_mqtt_hash_uint64_t(const void *item); + +AWS_MQTT_API bool aws_mqtt_compare_uint64_t_eq(const void *a, const void *b); + AWS_MQTT_API bool aws_mqtt_byte_cursor_hash_equality(const void *a, const void *b); AWS_MQTT_API struct aws_event_loop *aws_mqtt_client_connection_get_event_loop( diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/aws-c-mqtt-0.14.0/include/aws/mqtt/private/v5/mqtt5_client_impl.h new/aws-c-mqtt-0.15.0/include/aws/mqtt/private/v5/mqtt5_client_impl.h --- old/aws-c-mqtt-0.14.0/include/aws/mqtt/private/v5/mqtt5_client_impl.h 2026-02-17 18:31:31.000000000 +0100 +++ new/aws-c-mqtt-0.15.0/include/aws/mqtt/private/v5/mqtt5_client_impl.h 2026-03-11 19:20:15.000000000 +0100 @@ -252,6 +252,17 @@ */ aws_mqtt5_packet_id_t next_mqtt_packet_id; + /* + * One more than the most recently used control packet id. + */ + uint64_t next_mqtt5_puback_control_id; + /* These tables are used for fast search of a packet_id and control_id pair. They both set an + * aws_mqtt5_manual_puback_entry as its value */ + struct aws_hash_table manual_puback_packet_id_table; + struct aws_hash_table manual_puback_control_id_table; + /* This table contains control ids known to be cancelled due to client disconnect for better failure messaging */ + struct aws_hash_table manual_puback_cancelled_control_id_table; + struct aws_linked_list queued_operations; struct aws_mqtt5_operation *current_operation; struct aws_hash_table unacked_operations_table; diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/aws-c-mqtt-0.14.0/include/aws/mqtt/private/v5/mqtt5_options_storage.h new/aws-c-mqtt-0.15.0/include/aws/mqtt/private/v5/mqtt5_options_storage.h --- old/aws-c-mqtt-0.14.0/include/aws/mqtt/private/v5/mqtt5_options_storage.h 2026-02-17 18:31:31.000000000 +0100 +++ new/aws-c-mqtt-0.15.0/include/aws/mqtt/private/v5/mqtt5_options_storage.h 2026-03-11 19:20:15.000000000 +0100 @@ -101,6 +101,8 @@ struct aws_allocator *allocator; struct aws_mqtt5_packet_puback_storage options_storage; + + struct aws_mqtt5_manual_puback_completion_options completion_options; }; struct aws_mqtt5_operation_disconnect { @@ -272,7 +274,8 @@ AWS_MQTT_API struct aws_mqtt5_operation_puback *aws_mqtt5_operation_puback_new( struct aws_allocator *allocator, - const struct aws_mqtt5_packet_puback_view *puback_options); + const struct aws_mqtt5_packet_puback_view *puback_options, + const struct aws_mqtt5_manual_puback_completion_options *completion_options); AWS_MQTT_API void aws_mqtt5_packet_puback_view_log( const struct aws_mqtt5_packet_puback_view *puback_view, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/aws-c-mqtt-0.14.0/include/aws/mqtt/v5/mqtt5_client.h new/aws-c-mqtt-0.15.0/include/aws/mqtt/v5/mqtt5_client.h --- old/aws-c-mqtt-0.14.0/include/aws/mqtt/v5/mqtt5_client.h 2026-02-17 18:31:31.000000000 +0100 +++ new/aws-c-mqtt-0.15.0/include/aws/mqtt/v5/mqtt5_client.h 2026-03-11 19:20:15.000000000 +0100 @@ -295,6 +295,16 @@ void *complete_ctx); /** + * Signature of callback invoked when a manual PUBACK operation completes. + * + * @param puback_result result of the PUBACK operation + * @param completion_user_data user data passed in with the completion options + */ +typedef void(aws_mqtt5_manual_puback_completion_fn)( + enum aws_mqtt5_manual_puback_result puback_result, + void *completion_user_data); + +/** * Signature of callback to invoke on Subscribe success/failure. */ typedef void(aws_mqtt5_subscribe_completion_fn)( @@ -346,6 +356,14 @@ }; /** + * Completion options for the manual puback operation + */ +struct aws_mqtt5_manual_puback_completion_options { + aws_mqtt5_manual_puback_completion_fn *completion_callback; + void *completion_user_data; +}; + +/** * Completion options for the Subscribe operation */ struct aws_mqtt5_subscribe_completion_options { @@ -739,6 +757,35 @@ const struct aws_mqtt5_publish_completion_options *completion_options); /** + * Takes manual control of PUBACK for the given PUBLISH packet. + * + * This MUST only be called from within the publish received callback. A return value of 0 indicates an + * invalid control id. + * + * @param client mqtt5 client that received the PUBLISH packet to take manual PUBACK control from. + * @param publish_view the view of the PUBLISH packet that PUBACK control is taken from. + * @return puback_control_id of the PUBLISH packet. This can be used to schedule a PUBACK for the PUBLISH packet + * when used with the aws_mqtt5_client_invoke_puback function call. + */ +AWS_MQTT_API uint64_t aws_mqtt5_client_acquire_puback( + struct aws_mqtt5_client *client, + const struct aws_mqtt5_packet_publish_view *publish_view); + +/** + * Send PUBACK for provided control id. Callback in completion_options will be invoked with an + * aws_mqtt5_manual_puback_result once a PUBACK operation has been completed. + * + * @param client mqtt5 client to queue a puback for + * @param puback_control_id Control ID of aws_mqtt5_manual_puback_entry to send to broker/server + * @return success/failure of starting the manual PUBACK operation. + */ +AWS_MQTT_API +int aws_mqtt5_client_invoke_puback( + struct aws_mqtt5_client *client, + uint64_t puback_control_id, + const struct aws_mqtt5_manual_puback_completion_options *completion_options); + +/** * Queues a Subscribe operation in an mqtt5 client * * @param client mqtt5 client to queue a Subscribe for diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/aws-c-mqtt-0.14.0/include/aws/mqtt/v5/mqtt5_types.h new/aws-c-mqtt-0.15.0/include/aws/mqtt/v5/mqtt5_types.h --- old/aws-c-mqtt-0.14.0/include/aws/mqtt/v5/mqtt5_types.h 2026-02-17 18:31:31.000000000 +0100 +++ new/aws-c-mqtt-0.15.0/include/aws/mqtt/v5/mqtt5_types.h 2026-03-11 19:20:15.000000000 +0100 @@ -10,6 +10,7 @@ #include <aws/common/array_list.h> #include <aws/common/byte_buf.h> +#include <aws/common/ref_count.h> AWS_PUSH_SANE_WARNING_LEVEL @@ -140,6 +141,17 @@ }; /** + * Result for manual PUBACK operations. + * + */ +enum aws_mqtt5_manual_puback_result { + AWS_MQTT5_MPR_SUCCESS = 0, + AWS_MQTT5_MPR_PUBACK_CANCELLED = 1, + AWS_MQTT5_MPR_PUBACK_INVALID = 2, + AWS_MQTT5_MPR_CRT_FAILURE = 3, +}; + +/** * Reason code inside SUBACK packet payloads. * Enum values match mqtt spec encoding values. * @@ -444,6 +456,19 @@ }; /** + * This is used to track which PUBLISH packets a user has taken manual PUBACK control from. + */ +struct aws_mqtt5_manual_puback_entry { + struct aws_allocator *allocator; + struct aws_ref_count ref_count; + + /* control id for internal tracking */ + uint64_t puback_control_id; + /* packet_id of controlled publish */ + uint16_t packet_id; +}; + +/** * Read-only snapshot of a SUBACK packet * * https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901171 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/aws-c-mqtt-0.14.0/source/client_impl_shared.c new/aws-c-mqtt-0.15.0/source/client_impl_shared.c --- old/aws-c-mqtt-0.14.0/source/client_impl_shared.c 2026-02-17 18:31:31.000000000 +0100 +++ new/aws-c-mqtt-0.15.0/source/client_impl_shared.c 2026-03-11 19:20:15.000000000 +0100 @@ -217,6 +217,14 @@ return *(uint16_t *)a == *(uint16_t *)b; } +uint64_t aws_mqtt_hash_uint64_t(const void *item) { + return *(uint64_t *)item; +} + +bool aws_mqtt_compare_uint64_t_eq(const void *a, const void *b) { + return *(uint64_t *)a == *(uint64_t *)b; +} + bool aws_mqtt_byte_cursor_hash_equality(const void *a, const void *b) { const struct aws_byte_cursor *a_cursor = a; const struct aws_byte_cursor *b_cursor = b; diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/aws-c-mqtt-0.14.0/source/v5/mqtt5_client.c new/aws-c-mqtt-0.15.0/source/v5/mqtt5_client.c --- old/aws-c-mqtt-0.14.0/source/v5/mqtt5_client.c 2026-02-17 18:31:31.000000000 +0100 +++ new/aws-c-mqtt-0.15.0/source/v5/mqtt5_client.c 2026-03-11 19:20:15.000000000 +0100 @@ -590,6 +590,53 @@ s_reevaluate_service_task(client); } +/* This is used to move the aws_mqtt5_manual_puback_entry stored in the active table to cancelled table. We don't simply + * clear out the active table because we need to keep the entries in memory until we are fully done with them. */ +static int s_manual_puback_transfer(void *context, struct aws_hash_element *element) { + struct aws_hash_table *manual_puback_cancelled_control_id_table = context; + + struct aws_mqtt5_manual_puback_entry *manual_puback_entry = element->value; + // Add the control id to the destination table + if (aws_hash_table_put(manual_puback_cancelled_control_id_table, element->key, element->value, NULL)) { + return AWS_COMMON_HASH_TABLE_ITER_ERROR; + } + // incref the ref_count because when this entry is removed from the original set it will decref. + aws_ref_count_acquire(&manual_puback_entry->ref_count); + + // We simply continue here and will clear (and thus decref) after we finish iterating. + return AWS_COMMON_HASH_TABLE_ITER_CONTINUE; +} + +/* This is called when the manual puback entry is removed from a hashset to properly decref on removal */ +static void s_aws_mqtt5_manual_puback_entry_decref(void *value) { + struct aws_mqtt5_manual_puback_entry *manual_puback_entry = value; + if (manual_puback_entry != NULL) { + aws_ref_count_release(&manual_puback_entry->ref_count); + } +} + +/* When a disconnect or stop occurs all Manual Pubacks become invalid. We clear packet ids which may be reused + * by the server and transfer manual puback entries from the active table to the cancelled table + * to provide better communication if the user attempts to invoke a PUBACK they think they have control over. */ +static void s_aws_mqtt5_reset_manual_puback_tables( + struct aws_mqtt5_client_operational_state *client_operational_state) { + size_t count = aws_hash_table_get_entry_count(&client_operational_state->manual_puback_control_id_table); + if (count > 0) { + AWS_LOGF_DEBUG( + AWS_LS_MQTT5_CLIENT, + "id=%p: Clearing %zu PUBACKs under user control. Previously controlled PUBACKs are no longer valid and " + "have been cancelled.", + (void *)client_operational_state->client, + count); + aws_hash_table_clear(&client_operational_state->manual_puback_packet_id_table); + aws_hash_table_foreach( + &client_operational_state->manual_puback_control_id_table, + s_manual_puback_transfer, + &client_operational_state->manual_puback_cancelled_control_id_table); + aws_hash_table_clear(&client_operational_state->manual_puback_control_id_table); + } +} + static void s_aws_mqtt5_client_operational_state_reset( struct aws_mqtt5_client_operational_state *client_operational_state, int completion_error_code, @@ -604,9 +651,13 @@ if (is_final) { aws_priority_queue_clean_up(&client_operational_state->operations_by_ack_timeout); aws_hash_table_clean_up(&client_operational_state->unacked_operations_table); + aws_hash_table_clean_up(&client_operational_state->manual_puback_control_id_table); + aws_hash_table_clean_up(&client_operational_state->manual_puback_packet_id_table); + aws_hash_table_clean_up(&client_operational_state->manual_puback_cancelled_control_id_table); } else { aws_priority_queue_clear(&client->operational_state.operations_by_ack_timeout); aws_hash_table_clear(&client_operational_state->unacked_operations_table); + s_aws_mqtt5_reset_manual_puback_tables(client_operational_state); } } @@ -617,7 +668,8 @@ s_aws_mqtt5_client_operational_state_reset(&client->operational_state, AWS_ERROR_MQTT5_USER_REQUESTED_STOP, false); - /* Stop works as a complete session wipe, and so the next time we connect, we want it to be clean */ + /* Stop works as a complete session wipe, and so the next time we connect, we want it to be clean unless + * client session behavior type is set to AWS_MQTT5_CSBT_REJOIN_ALWAYS. */ client->has_connected_successfully = false; s_aws_mqtt5_client_emit_stopped_lifecycle_event(client); @@ -1811,7 +1863,10 @@ aws_linked_list_insert_before(current_node, node); } -static int s_aws_mqtt5_client_queue_puback(struct aws_mqtt5_client *client, uint16_t packet_id) { +static int s_aws_mqtt5_client_queue_puback( + struct aws_mqtt5_client *client, + uint16_t packet_id, + const struct aws_mqtt5_manual_puback_completion_options *completion_options) { AWS_PRECONDITION(client != NULL); const struct aws_mqtt5_packet_puback_view puback_view = { @@ -1819,7 +1874,8 @@ .reason_code = AWS_MQTT5_PARC_SUCCESS, }; - struct aws_mqtt5_operation_puback *puback_op = aws_mqtt5_operation_puback_new(client->allocator, &puback_view); + struct aws_mqtt5_operation_puback *puback_op = + aws_mqtt5_operation_puback_new(client->allocator, &puback_view, completion_options); if (puback_op == NULL) { return AWS_OP_ERR; @@ -1883,20 +1939,25 @@ aws_mqtt5_callback_set_manager_on_publish_received(&client->callback_manager, publish_view); - /* Send a puback if QoS 1+ */ if (publish_view->qos != AWS_MQTT5_QOS_AT_MOST_ONCE) { + // Check if this PUBLSIH packet is manually controlled + struct aws_hash_element *elem = NULL; + aws_hash_table_find( + &client->operational_state.manual_puback_packet_id_table, &publish_view->packet_id, &elem); + + /* This PUBLISH isn't a manually controlled PUBACK. We schedule the PUBACK to be sent immediately. */ + if (elem == NULL) { + if (s_aws_mqtt5_client_queue_puback(client, publish_view->packet_id, NULL)) { + int error_code = aws_last_error(); + AWS_LOGF_ERROR( + AWS_LS_MQTT5_CLIENT, + "id=%p: decode failure with error %d(%s)", + (void *)client, + error_code, + aws_error_debug_str(error_code)); - int result = s_aws_mqtt5_client_queue_puback(client, publish_view->packet_id); - if (result != AWS_OP_SUCCESS) { - int error_code = aws_last_error(); - AWS_LOGF_ERROR( - AWS_LS_MQTT5_CLIENT, - "id=%p: decode failure with error %d(%s)", - (void *)client, - error_code, - aws_error_debug_str(error_code)); - - s_aws_mqtt5_client_shutdown_channel(client, error_code); + s_aws_mqtt5_client_shutdown_channel(client, error_code); + } } } break; @@ -2430,6 +2491,243 @@ return AWS_OP_ERR; } +struct aws_mqtt5_manual_puback_task { + struct aws_task task; + struct aws_allocator *allocator; + struct aws_mqtt5_client *client; + uint64_t puback_control_id; + struct aws_mqtt5_manual_puback_completion_options completion_options; +}; + +static void s_mqtt5_manual_puback_task_fn(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)task; + + struct aws_mqtt5_manual_puback_task *manual_puback_task = arg; + struct aws_mqtt5_client *client = manual_puback_task->client; + uint64_t puback_control_id = manual_puback_task->puback_control_id; + + if (status != AWS_TASK_STATUS_RUN_READY) { + goto cleanup; + } + + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(client->loop)); + + enum aws_mqtt5_manual_puback_result puback_result = AWS_MQTT5_MPR_SUCCESS; + + struct aws_hash_element *elem = NULL; + aws_hash_table_find(&client->operational_state.manual_puback_control_id_table, &puback_control_id, &elem); + + // We can schedule the PUBACK as an mqtt operation if it exists in the control id table. + if (elem != NULL) { + struct aws_mqtt5_manual_puback_entry *manual_puback_entry = + (struct aws_mqtt5_manual_puback_entry *)(elem->value); + + AWS_LOGF_DEBUG( + AWS_LS_MQTT5_CLIENT, + "id=%p: Scheuduling puback for control id: %llu for packet id: %d \n", + (void *)client, + (unsigned long long)manual_puback_entry->puback_control_id, + manual_puback_entry->packet_id); + + uint16_t packet_id = manual_puback_entry->packet_id; + + aws_hash_table_remove(&client->operational_state.manual_puback_packet_id_table, &packet_id, NULL, NULL); + // This removal from the control id table will also deallocate the manual puback entry. + aws_hash_table_remove( + &client->operational_state.manual_puback_control_id_table, &puback_control_id, NULL, NULL); + + if (s_aws_mqtt5_client_queue_puback(client, packet_id, &manual_puback_task->completion_options)) { + // this failure doesn't trigger the completion so we do it here before cleanup. + puback_result = AWS_MQTT5_MPR_CRT_FAILURE; + goto completion; + } + + // Completion will be handled by the puback mqtt operation so we go to cleanup. + goto cleanup; + } + + // If the control id isn't in the active table, check if it's been cancelled. + aws_hash_table_find(&client->operational_state.manual_puback_cancelled_control_id_table, &puback_control_id, &elem); + if (elem != NULL) { + AWS_LOGF_DEBUG( + AWS_LS_MQTT5_CLIENT, + "id=%p: puback_control_id: %llu has been cancelled due to a disconnection.", + (void *)client, + (unsigned long long)puback_control_id); + + // A cancelled control id has been used so it can be cleared from the table and deallocated. We only report + // a cancellation once. Past that we will simply treat it as invalid. + aws_hash_table_remove( + &client->operational_state.manual_puback_cancelled_control_id_table, &puback_control_id, NULL, NULL); + puback_result = AWS_MQTT5_MPR_PUBACK_CANCELLED; + goto completion; + } + + AWS_LOGF_DEBUG( + AWS_LS_MQTT5_CLIENT, + "id=%p: puback_control_id: %llu is not tracked in any way.", + (void *)client, + (unsigned long long)puback_control_id); + puback_result = AWS_MQTT5_MPR_PUBACK_INVALID; + +completion: + // We call the completion callback here in cases where there is no PUBACK operation scheduled on the client. + if (manual_puback_task->completion_options.completion_callback != NULL) { + manual_puback_task->completion_options.completion_callback( + puback_result, manual_puback_task->completion_options.completion_user_data); + } + +cleanup: + aws_mqtt5_client_release(manual_puback_task->client); + aws_mem_release(manual_puback_task->allocator, manual_puback_task); +} + +// Schedules task to process a manual PUBACK for provided puback_control_id +int aws_mqtt5_client_invoke_puback( + struct aws_mqtt5_client *client, + uint64_t puback_control_id, + const struct aws_mqtt5_manual_puback_completion_options *completion_options) { + AWS_PRECONDITION(client != NULL); + + struct aws_mqtt5_manual_puback_task *manual_puback_task = + aws_mem_calloc(client->allocator, 1, sizeof(struct aws_mqtt5_manual_puback_task)); + + aws_task_init( + &manual_puback_task->task, s_mqtt5_manual_puback_task_fn, manual_puback_task, "Mqtt5ManualPubackTask"); + manual_puback_task->allocator = client->allocator; + manual_puback_task->client = aws_mqtt5_client_acquire(client); + manual_puback_task->puback_control_id = puback_control_id; + if (completion_options != NULL) { + manual_puback_task->completion_options = *completion_options; + } + + aws_event_loop_schedule_task_now(client->loop, &manual_puback_task->task); + + return AWS_OP_SUCCESS; +} + +static void s_aws_mqtt5_manual_puback_entry_destroy(void *object) { + if (object == NULL) { + return; + } + struct aws_mqtt5_manual_puback_entry *manual_puback_entry = object; + aws_mem_release(manual_puback_entry->allocator, manual_puback_entry); +} + +static struct aws_mqtt5_manual_puback_entry *s_aws_mqtt_manual_puback_entry_new( + struct aws_allocator *allocator, + uint16_t packet_id, + uint64_t puback_control_id) { + + struct aws_mqtt5_manual_puback_entry *manual_puback_entry = + aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt5_manual_puback_entry)); + + manual_puback_entry->allocator = allocator; + aws_ref_count_init(&manual_puback_entry->ref_count, manual_puback_entry, s_aws_mqtt5_manual_puback_entry_destroy); + manual_puback_entry->packet_id = packet_id; + manual_puback_entry->puback_control_id = puback_control_id; + + return manual_puback_entry; +} + +uint64_t aws_mqtt5_client_acquire_puback( + struct aws_mqtt5_client *client, + const struct aws_mqtt5_packet_publish_view *publish_view) { + AWS_PRECONDITION(client != NULL); + AWS_PRECONDITION(publish_view != NULL); + /* This should only ever be called by the user within the publish received callback */ + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(client->loop)); + + if (publish_view->qos == AWS_MQTT5_QOS_AT_MOST_ONCE) { + AWS_LOGF_ERROR( + AWS_LS_MQTT5_CLIENT, "id=%p: PUBACK control cannot be taken for a QoS 0 PUBLISH packet.", (void *)client); + return 0; + } + + /* First check if the packet_id for the PUBLISH is already being controlled. This can potentialy be the case if a + * PUBLISH that has its PUBACK acquired but hasn't been sent for long enough that the broker has sent the PUBLISH + * packet again. */ + struct aws_hash_element *elem = NULL; + aws_hash_table_find(&client->operational_state.manual_puback_packet_id_table, &publish_view->packet_id, &elem); + if (elem != NULL) { + /* In this case we simply provide the same control_id that was already sent before. We do not want to create a + * second control_id with the same packet_id. It is the user's responsibility to know that they have two PUBACKs + * for the same PUBLISH. */ + AWS_LOGF_WARN( + AWS_LS_MQTT5_CLIENT, + "id=%p: PUBACK acquire called on a PUBLISH that is already under user control.", + (void *)client); + struct aws_mqtt5_manual_puback_entry *entry = elem->value; + return entry->puback_control_id; + } + + // The current_control_packet_id is incremented each time a new manual puback is scheduled. + uint64_t current_control_packet_id = client->operational_state.next_mqtt5_puback_control_id; + struct aws_mqtt5_manual_puback_entry *manual_puback = + s_aws_mqtt_manual_puback_entry_new(client->allocator, publish_view->packet_id, current_control_packet_id); + + /* Allows lookup of manual puback entries by packet id. */ + if (aws_hash_table_put( + &client->operational_state.manual_puback_packet_id_table, &manual_puback->packet_id, manual_puback, NULL)) { + int error_code = aws_last_error(); + AWS_LOGF_ERROR( + AWS_LS_MQTT5_CLIENT, + "id=%p: Failed to insert manual PUBACK entry into packet ID table: %d(%s)", + (void *)client, + error_code, + aws_error_debug_str(error_code)); + goto cleanup; + } + /* We incref here because the packet_id table also has a ref to the manual_puback */ + aws_ref_count_acquire(&manual_puback->ref_count); + + /* Allows lookup of manual puback entries by control id */ + if (aws_hash_table_put( + &client->operational_state.manual_puback_control_id_table, + &manual_puback->puback_control_id, + manual_puback, + NULL)) { + int error_code = aws_last_error(); + AWS_LOGF_ERROR( + AWS_LS_MQTT5_CLIENT, + "id=%p: Failed to insert manual PUBACK entry into control ID table: %d(%s)", + (void *)client, + error_code, + aws_error_debug_str(error_code)); + // clean up the manual puback entry from the packet id table and deallocate. + aws_hash_table_remove( + &client->operational_state.manual_puback_packet_id_table, &manual_puback->packet_id, NULL, NULL); + goto cleanup; + } + + /* Increment next_mqtt5_puback_control_id for next use */ + client->operational_state.next_mqtt5_puback_control_id = current_control_packet_id + 1; + + size_t in_flight_unacked_publishes = + aws_hash_table_get_entry_count(&client->operational_state.manual_puback_control_id_table); + if (in_flight_unacked_publishes >= 100) { + AWS_LOGF_WARN( + AWS_LS_MQTT5_CLIENT, + "id=%p: 100 or more PUBACKs under user control: %zu. AWS IoT Core limits of 100 in-flight has been met or " + "exceeded.", + (void *)client, + in_flight_unacked_publishes); + } else { + AWS_LOGF_DEBUG( + AWS_LS_MQTT5_CLIENT, + "id=%p: Manual PUBACK control taken for a PUBLISH packet. Current in-flight PUBACKs under user control: " + "%zu", + (void *)client, + in_flight_unacked_publishes); + } + + return manual_puback->puback_control_id; + +cleanup: + aws_mem_release(manual_puback->allocator, manual_puback); + return 0; +} + static bool s_needs_packet_id(const struct aws_mqtt5_operation *operation) { switch (operation->packet_type) { case AWS_MQTT5_PT_SUBSCRIBE: @@ -2523,6 +2821,39 @@ return AWS_OP_ERR; } + if (aws_hash_table_init( + &client_operational_state->manual_puback_packet_id_table, + allocator, + DEFAULT_MQTT5_OPERATION_TABLE_SIZE, + aws_mqtt_hash_uint16_t, + aws_mqtt_compare_uint16_t_eq, + NULL, + s_aws_mqtt5_manual_puback_entry_decref)) { + return AWS_OP_ERR; + } + + if (aws_hash_table_init( + &client_operational_state->manual_puback_control_id_table, + allocator, + DEFAULT_MQTT5_OPERATION_TABLE_SIZE, + aws_mqtt_hash_uint64_t, + aws_mqtt_compare_uint64_t_eq, + NULL, + s_aws_mqtt5_manual_puback_entry_decref)) { + return AWS_OP_ERR; + } + + if (aws_hash_table_init( + &client_operational_state->manual_puback_cancelled_control_id_table, + allocator, + DEFAULT_MQTT5_OPERATION_TABLE_SIZE, + aws_mqtt_hash_uint64_t, + aws_mqtt_compare_uint64_t_eq, + NULL, + s_aws_mqtt5_manual_puback_entry_decref)) { + return AWS_OP_ERR; + } + if (aws_priority_queue_init_dynamic( &client_operational_state->operations_by_ack_timeout, allocator, @@ -2533,6 +2864,7 @@ } client_operational_state->next_mqtt_packet_id = 1; + client_operational_state->next_mqtt5_puback_control_id = 1; client_operational_state->current_operation = NULL; client_operational_state->client = client; @@ -2648,6 +2980,8 @@ aws_hash_table_clear(&client->operational_state.unacked_operations_table); aws_priority_queue_clear(&client->operational_state.operations_by_ack_timeout); + s_aws_mqtt5_reset_manual_puback_tables(client_operational_state); + /* * Prevents inbound resolution on the highly unlikely, illegal server behavior of sending a PUBLISH before * a CONNACK on next connection establishment. @@ -2841,8 +3175,8 @@ * Estimate the # of ethernet frames (max 1444 bytes) and add in potential TLS framing and padding values per. * * TODO: query IoT Core to determine if this calculation is needed after all - * TODO: may eventually want to expose the ethernet frame size here as a configurable option for networks that have a - * lower MTU + * TODO: may eventually want to expose the ethernet frame size here as a configurable option for networks that have + * a lower MTU * * References: * https://tools.ietf.org/id/draft-mattsson-uta-tls-overhead-01.xml#rfc.section.3 @@ -3129,9 +3463,9 @@ * sporadically fail because the PINGRESP is processed before the write completion callback is * invoked. * - * (2) Enqueue the ping - if the current operation is a large payload over a poor connection, it may - * be an arbitrarily long time before the current operation completes and the ping even has a chance - * to go out, meaning we will trigger a ping time out before it's even sent. + * (2) Enqueue the ping - if the current operation is a large payload over a poor connection, it + * may be an arbitrarily long time before the current operation completes and the ping even has a + * chance to go out, meaning we will trigger a ping time out before it's even sent. * * Given a reasonable io message size, this is the best place to set the timeout. */ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/aws-c-mqtt-0.14.0/source/v5/mqtt5_options_storage.c new/aws-c-mqtt-0.15.0/source/v5/mqtt5_options_storage.c --- old/aws-c-mqtt-0.14.0/source/v5/mqtt5_options_storage.c 2026-02-17 18:31:31.000000000 +0100 +++ new/aws-c-mqtt-0.15.0/source/v5/mqtt5_options_storage.c 2026-03-11 19:20:15.000000000 +0100 @@ -2287,6 +2287,47 @@ "aws_mqtt5_packet_puback_view"); } +static void s_aws_mqtt5_operation_puback_manual_completion( + struct aws_mqtt5_operation *operation, + int error_code, + enum aws_mqtt5_packet_type packet_type, + const void *completion_view) { + (void)packet_type; + (void)completion_view; + struct aws_mqtt5_operation_puback *puback_op = operation->impl; + + /* Completion callback on manual PUBACK. + * Completion callback options are not currently bound out as there is an edge case where + * we would return a successful completion of the manual puback as a redriven PUBLISH packet + * is simultaneously received. This could cause confusion to the user and cause them to + * assume the redriven PUBLISH is a new PUBLISH and not a duplicate of the one they have just + * gotten a PUBACK invoke success from. We may re-instate this in the future upon customer + * request. We retain the logic to help us provide better logs of what is going on in the + * event we need to track down bugs for a user or ourselves. + */ + if (puback_op->completion_options.completion_callback != NULL) { + // Convert error_code to manual puback result + enum aws_mqtt5_manual_puback_result puback_result = AWS_MQTT5_MPR_SUCCESS; + if (error_code != AWS_OP_SUCCESS) { + /* There is a significant list of possible errors that could have occurred during the processing of a + * PUBACK. Instead of mapping each one, we report a CRT failure which should indicate the important part. + * That the PUBACK was not sent and it's likely they will receive a duplicate PUBLISH. If they want more + * details the logs will need to be investigated. */ + puback_result = AWS_MQTT5_MPR_CRT_FAILURE; + } + puback_op->completion_options.completion_callback( + puback_result, puback_op->completion_options.completion_user_data); + } +} + +static struct aws_mqtt5_operation_vtable s_puback_operation_vtable = { + .aws_mqtt5_operation_completion_fn = s_aws_mqtt5_operation_puback_manual_completion, + .aws_mqtt5_operation_set_packet_id_fn = NULL, + .aws_mqtt5_operation_get_packet_id_address_fn = NULL, + .aws_mqtt5_operation_validate_vs_connection_settings_fn = NULL, + .aws_mqtt5_operation_get_ack_timeout_override_fn = NULL, +}; + static void s_destroy_operation_puback(void *object) { if (object == NULL) { return; @@ -2301,7 +2342,8 @@ struct aws_mqtt5_operation_puback *aws_mqtt5_operation_puback_new( struct aws_allocator *allocator, - const struct aws_mqtt5_packet_puback_view *puback_options) { + const struct aws_mqtt5_packet_puback_view *puback_options, + const struct aws_mqtt5_manual_puback_completion_options *completion_options) { AWS_PRECONDITION(allocator != NULL); AWS_PRECONDITION(puback_options != NULL); @@ -2312,8 +2354,11 @@ } puback_op->allocator = allocator; - puback_op->base.vtable = &s_empty_operation_vtable; + puback_op->base.vtable = &s_puback_operation_vtable; puback_op->base.packet_type = AWS_MQTT5_PT_PUBACK; + if (completion_options != NULL) { + puback_op->completion_options = *completion_options; + } aws_ref_count_init(&puback_op->base.ref_count, puback_op, s_destroy_operation_puback); aws_priority_queue_node_init(&puback_op->base.priority_queue_node); puback_op->base.impl = puback_op; diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/aws-c-mqtt-0.14.0/tests/CMakeLists.txt new/aws-c-mqtt-0.15.0/tests/CMakeLists.txt --- old/aws-c-mqtt-0.14.0/tests/CMakeLists.txt 2026-02-17 18:31:31.000000000 +0100 +++ new/aws-c-mqtt-0.15.0/tests/CMakeLists.txt 2026-03-11 19:20:15.000000000 +0100 @@ -402,6 +402,15 @@ add_test_case(mqtt5_client_outbound_alias_manual_success_a_b_ar_br) add_test_case(mqtt5_client_outbound_alias_lru_success_a_b_c_br_cr_a) +# MQTT5 manual puback tests + +add_test_case(mqtt5_client_manual_puback_basic_success) +add_test_case(mqtt5_client_manual_puback_no_auto_puback) +add_test_case(mqtt5_client_manual_puback_invalid_control_id) +add_test_case(mqtt5_client_manual_puback_multiple_publishes) +add_test_case(mqtt5_client_manual_puback_disconnect_cancellation) + + # Mqtt5 Metrics tests add_test_case(mqtt5_client_set_metrics_valid) add_test_case(mqtt5_client_set_metrics_null) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/aws-c-mqtt-0.14.0/tests/v5/mqtt5_client_manual_puback_tests.c new/aws-c-mqtt-0.15.0/tests/v5/mqtt5_client_manual_puback_tests.c --- old/aws-c-mqtt-0.14.0/tests/v5/mqtt5_client_manual_puback_tests.c 1970-01-01 01:00:00.000000000 +0100 +++ new/aws-c-mqtt-0.15.0/tests/v5/mqtt5_client_manual_puback_tests.c 2026-03-11 19:20:15.000000000 +0100 @@ -0,0 +1,652 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include "mqtt5_testing_utils.h" + +#include <aws/common/clock.h> +#include <aws/common/string.h> +#include <aws/mqtt/mqtt.h> +#include <aws/mqtt/private/v5/mqtt5_utils.h> +#include <aws/mqtt/v5/mqtt5_client.h> +#include <aws/mqtt/v5/mqtt5_listener.h> + +#include <aws/testing/aws_test_harness.h> + +/** + * Test context for manual PUBACK tests + */ +struct aws_mqtt5_manual_puback_test_context { + struct aws_mqtt5_client_mock_test_fixture *test_fixture; + + /* Control IDs for manual puback */ + uint64_t puback_control_ids[10]; + size_t puback_control_count; + + /* Results from manual puback operations */ + enum aws_mqtt5_manual_puback_result puback_results[10]; + size_t puback_result_count; + + /* Tracking flags */ + bool publish_received; + size_t publishes_received_count; + bool puback_callback_invoked; + size_t puback_callbacks_invoked_count; + + /* For tracking publish packet IDs */ + uint16_t received_packet_ids[10]; + size_t received_packet_id_count; +}; + +static void s_manual_puback_test_context_init(struct aws_mqtt5_manual_puback_test_context *context) { + AWS_ZERO_STRUCT(*context); +} + +static void s_manual_puback_completion_fn(enum aws_mqtt5_manual_puback_result puback_result, void *complete_ctx) { + + struct aws_mqtt5_manual_puback_test_context *context = complete_ctx; + struct aws_mqtt5_client_mock_test_fixture *test_fixture = context->test_fixture; + + aws_mutex_lock(&test_fixture->lock); + + if (context->puback_result_count < AWS_ARRAY_SIZE(context->puback_results)) { + context->puback_results[context->puback_result_count++] = puback_result; + } + + context->puback_callback_invoked = true; + context->puback_callbacks_invoked_count++; + + aws_mutex_unlock(&test_fixture->lock); + aws_condition_variable_notify_all(&test_fixture->signal); +} + +static bool s_manual_puback_publish_received_handler( + const struct aws_mqtt5_packet_publish_view *publish, + void *user_data) { + + struct aws_mqtt5_manual_puback_test_context *context = user_data; + struct aws_mqtt5_client_mock_test_fixture *test_fixture = context->test_fixture; + + if (publish->qos == AWS_MQTT5_QOS_AT_LEAST_ONCE) { + aws_mutex_lock(&test_fixture->lock); + + /* Acquire manual puback control */ + uint64_t control_id = aws_mqtt5_client_acquire_puback(test_fixture->client, publish); + + if (context->puback_control_count < AWS_ARRAY_SIZE(context->puback_control_ids)) { + context->puback_control_ids[context->puback_control_count++] = control_id; + } + + if (context->received_packet_id_count < AWS_ARRAY_SIZE(context->received_packet_ids)) { + context->received_packet_ids[context->received_packet_id_count++] = publish->packet_id; + } + + context->publish_received = true; + context->publishes_received_count++; + + aws_mutex_unlock(&test_fixture->lock); + aws_condition_variable_notify_all(&test_fixture->signal); + + return true; /* Signal we handled it manually */ + } + + return false; +} + +static bool s_manual_puback_callback_invoked(void *arg) { + struct aws_mqtt5_manual_puback_test_context *context = arg; + return context->puback_callback_invoked; +} + +static void s_wait_for_manual_puback_callback(struct aws_mqtt5_manual_puback_test_context *context) { + struct aws_mqtt5_client_mock_test_fixture *test_fixture = context->test_fixture; + + aws_mutex_lock(&test_fixture->lock); + aws_condition_variable_wait_pred( + &test_fixture->signal, &test_fixture->lock, s_manual_puback_callback_invoked, context); + aws_mutex_unlock(&test_fixture->lock); +} + +static bool s_manual_puback_publish_received(void *arg) { + struct aws_mqtt5_manual_puback_test_context *context = arg; + return context->publish_received; +} + +static void s_wait_for_manual_puback_publish(struct aws_mqtt5_manual_puback_test_context *context) { + struct aws_mqtt5_client_mock_test_fixture *test_fixture = context->test_fixture; + + aws_mutex_lock(&test_fixture->lock); + aws_condition_variable_wait_pred( + &test_fixture->signal, &test_fixture->lock, s_manual_puback_publish_received, context); + aws_mutex_unlock(&test_fixture->lock); +} + +static void s_wait_for_n_manual_puback_publishes(struct aws_mqtt5_manual_puback_test_context *context, size_t count) { + struct aws_mqtt5_client_mock_test_fixture *test_fixture = context->test_fixture; + + aws_mutex_lock(&test_fixture->lock); + while (context->publishes_received_count < count) { + aws_condition_variable_wait(&test_fixture->signal, &test_fixture->lock); + } + aws_mutex_unlock(&test_fixture->lock); +} + +static void s_wait_for_n_manual_puback_callbacks(struct aws_mqtt5_manual_puback_test_context *context, size_t count) { + struct aws_mqtt5_client_mock_test_fixture *test_fixture = context->test_fixture; + + aws_mutex_lock(&test_fixture->lock); + while (context->puback_callbacks_invoked_count < count) { + aws_condition_variable_wait(&test_fixture->signal, &test_fixture->lock); + } + aws_mutex_unlock(&test_fixture->lock); +} + +/* Mock server handler that sends a QoS1 PUBLISH after CONNACK */ +static uint8_t s_test_topic[] = "test/manual/puback"; +static uint8_t s_test_payload[] = "test_payload"; + +struct aws_mqtt5_server_manual_puback_context { + struct aws_mqtt5_client_mock_test_fixture *test_fixture; + bool publish_sent; + bool connack_sent; + size_t publishes_to_send; +}; + +static void s_aws_mqtt5_mock_server_send_qos1_publish( + struct aws_mqtt5_server_mock_connection_context *mock_server, + void *user_data) { + + struct aws_mqtt5_server_manual_puback_context *context = user_data; + + if (context->publish_sent || !context->connack_sent) { + return; + } + + context->publish_sent = true; + + size_t count = context->publishes_to_send > 0 ? context->publishes_to_send : 1; + + for (size_t i = 0; i < count; i++) { + struct aws_mqtt5_packet_publish_view publish_view = { + .packet_id = (uint16_t)(i + 1), + .qos = AWS_MQTT5_QOS_AT_LEAST_ONCE, + .topic = + { + .ptr = s_test_topic, + .len = AWS_ARRAY_SIZE(s_test_topic) - 1, + }, + .payload = + { + .ptr = s_test_payload, + .len = AWS_ARRAY_SIZE(s_test_payload) - 1, + }, + }; + + aws_mqtt5_mock_server_send_packet(mock_server, AWS_MQTT5_PT_PUBLISH, &publish_view); + } +} + +static int s_aws_mqtt5_server_send_qos1_publish_on_connect( + void *packet, + struct aws_mqtt5_server_mock_connection_context *connection, + void *user_data) { + + int result = aws_mqtt5_mock_server_handle_connect_always_succeed(packet, connection, user_data); + + struct aws_mqtt5_server_manual_puback_context *context = user_data; + context->connack_sent = true; + + return result; +} + +/** + * Basic manual PUBACK success + * - Server sends QoS1 PUBLISH + * - Client acquires manual puback control + * - Client invokes puback + * - Verify success result and server receives PUBACK + */ +static int s_mqtt5_client_manual_puback_basic_success_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options test_options; + aws_mqtt5_client_test_init_default_options(&test_options); + + struct aws_mqtt5_manual_puback_test_context puback_context; + s_manual_puback_test_context_init(&puback_context); + + struct aws_mqtt5_server_manual_puback_context server_context = { + .publish_sent = false, + .connack_sent = false, + .publishes_to_send = 1, + }; + + test_options.server_function_table.service_task_fn = s_aws_mqtt5_mock_server_send_qos1_publish; + test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_CONNECT] = + s_aws_mqtt5_server_send_qos1_publish_on_connect; + + struct aws_mqtt5_client_mqtt5_mock_test_fixture_options test_fixture_options = { + .client_options = &test_options.client_options, + .server_function_table = &test_options.server_function_table, + .mock_server_user_data = &server_context, + }; + + struct aws_mqtt5_client_mock_test_fixture test_context; + ASSERT_SUCCESS(aws_mqtt5_client_mock_test_fixture_init(&test_context, allocator, &test_fixture_options)); + + puback_context.test_fixture = &test_context; + server_context.test_fixture = &test_context; + + /* Create listener to intercept publishes */ + struct aws_mqtt5_listener_config listener_config = { + .client = test_context.client, + .listener_callbacks = { + .listener_publish_received_handler = s_manual_puback_publish_received_handler, + .listener_publish_received_handler_user_data = &puback_context, + }}; + + struct aws_mqtt5_listener *listener = aws_mqtt5_listener_new(allocator, &listener_config); + ASSERT_NOT_NULL(listener); + + /* Start client */ + ASSERT_SUCCESS(aws_mqtt5_client_start(test_context.client)); + aws_wait_for_connected_lifecycle_event(&test_context); + + /* Wait for publish to be received */ + s_wait_for_manual_puback_publish(&puback_context); + + /* Verify we got a control ID */ + aws_mutex_lock(&test_context.lock); + ASSERT_TRUE(puback_context.puback_control_count == 1); + ASSERT_TRUE(puback_context.puback_control_ids[0] != 0); + uint64_t control_id = puback_context.puback_control_ids[0]; + aws_mutex_unlock(&test_context.lock); + + /* Invoke manual puback */ + struct aws_mqtt5_manual_puback_completion_options completion_options = { + .completion_callback = s_manual_puback_completion_fn, + .completion_user_data = &puback_context, + }; + + ASSERT_SUCCESS(aws_mqtt5_client_invoke_puback(test_context.client, control_id, &completion_options)); + + /* Wait for completion callback */ + s_wait_for_manual_puback_callback(&puback_context); + + /* Verify success result */ + aws_mutex_lock(&test_context.lock); + ASSERT_INT_EQUALS(1, puback_context.puback_result_count); + ASSERT_INT_EQUALS(AWS_MQTT5_MPR_SUCCESS, puback_context.puback_results[0]); + aws_mutex_unlock(&test_context.lock); + + /* Clean up */ + aws_mqtt5_listener_release(listener); + ASSERT_SUCCESS(aws_mqtt5_client_stop(test_context.client, NULL, NULL)); + aws_wait_for_stopped_lifecycle_event(&test_context); + + aws_mqtt5_client_mock_test_fixture_clean_up(&test_context); + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(mqtt5_client_manual_puback_basic_success, s_mqtt5_client_manual_puback_basic_success_fn) + +/** + * Verify no automatic PUBACK when manual control taken + * - Server sends QoS1 PUBLISH + * - Client acquires manual puback control + * - Wait to verify no automatic PUBACK sent + * - Client invokes manual puback + * - Verify PUBACK is now sent + */ +static int s_mqtt5_client_manual_puback_no_auto_puback_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options test_options; + aws_mqtt5_client_test_init_default_options(&test_options); + + struct aws_mqtt5_manual_puback_test_context puback_context; + s_manual_puback_test_context_init(&puback_context); + + struct aws_mqtt5_server_manual_puback_context server_context = { + .publish_sent = false, + .connack_sent = false, + .publishes_to_send = 1, + }; + + test_options.server_function_table.service_task_fn = s_aws_mqtt5_mock_server_send_qos1_publish; + test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_CONNECT] = + s_aws_mqtt5_server_send_qos1_publish_on_connect; + + struct aws_mqtt5_client_mqtt5_mock_test_fixture_options test_fixture_options = { + .client_options = &test_options.client_options, + .server_function_table = &test_options.server_function_table, + .mock_server_user_data = &server_context, + }; + + struct aws_mqtt5_client_mock_test_fixture test_context; + ASSERT_SUCCESS(aws_mqtt5_client_mock_test_fixture_init(&test_context, allocator, &test_fixture_options)); + + puback_context.test_fixture = &test_context; + server_context.test_fixture = &test_context; + + /* Create listener */ + struct aws_mqtt5_listener_config listener_config = { + .client = test_context.client, + .listener_callbacks = { + .listener_publish_received_handler = s_manual_puback_publish_received_handler, + .listener_publish_received_handler_user_data = &puback_context, + }}; + + struct aws_mqtt5_listener *listener = aws_mqtt5_listener_new(allocator, &listener_config); + + ASSERT_SUCCESS(aws_mqtt5_client_start(test_context.client)); + aws_wait_for_connected_lifecycle_event(&test_context); + + s_wait_for_manual_puback_publish(&puback_context); + + aws_mutex_lock(&test_context.lock); + uint64_t control_id = puback_context.puback_control_ids[0]; + aws_mutex_unlock(&test_context.lock); + + /* Wait a bit to ensure no automatic PUBACK is sent */ + aws_thread_current_sleep(aws_timestamp_convert(1, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL)); + + /* Verify no PUBACK received by server yet */ + aws_mutex_lock(&test_context.lock); + size_t packet_count = aws_array_list_length(&test_context.server_received_packets); + bool found_puback = false; + for (size_t i = 0; i < packet_count; i++) { + struct aws_mqtt5_mock_server_packet_record *record = NULL; + aws_array_list_get_at_ptr(&test_context.server_received_packets, (void **)&record, i); + if (record->packet_type == AWS_MQTT5_PT_PUBACK) { + found_puback = true; + break; + } + } + ASSERT_FALSE(found_puback); + aws_mutex_unlock(&test_context.lock); + + /* Now invoke manual puback */ + struct aws_mqtt5_manual_puback_completion_options completion_options = { + .completion_callback = s_manual_puback_completion_fn, + .completion_user_data = &puback_context, + }; + + ASSERT_SUCCESS(aws_mqtt5_client_invoke_puback(test_context.client, control_id, &completion_options)); + s_wait_for_manual_puback_callback(&puback_context); + + /* Verify PUBACK now received by server */ + aws_thread_current_sleep(aws_timestamp_convert(100, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL)); + + aws_mutex_lock(&test_context.lock); + packet_count = aws_array_list_length(&test_context.server_received_packets); + found_puback = false; + for (size_t i = 0; i < packet_count; i++) { + struct aws_mqtt5_mock_server_packet_record *record = NULL; + aws_array_list_get_at_ptr(&test_context.server_received_packets, (void **)&record, i); + if (record->packet_type == AWS_MQTT5_PT_PUBACK) { + found_puback = true; + break; + } + } + ASSERT_TRUE(found_puback); + aws_mutex_unlock(&test_context.lock); + + aws_mqtt5_listener_release(listener); + ASSERT_SUCCESS(aws_mqtt5_client_stop(test_context.client, NULL, NULL)); + aws_wait_for_stopped_lifecycle_event(&test_context); + + aws_mqtt5_client_mock_test_fixture_clean_up(&test_context); + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(mqtt5_client_manual_puback_no_auto_puback, s_mqtt5_client_manual_puback_no_auto_puback_fn) + +/** + * Invalid control ID + * - Invoke puback with invalid control ID + * - Verify AWS_MQTT5_MPR_PUBACK_INVALID result + */ +static int s_mqtt5_client_manual_puback_invalid_control_id_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options test_options; + aws_mqtt5_client_test_init_default_options(&test_options); + + struct aws_mqtt5_manual_puback_test_context puback_context; + s_manual_puback_test_context_init(&puback_context); + + struct aws_mqtt5_client_mqtt5_mock_test_fixture_options test_fixture_options = { + .client_options = &test_options.client_options, + .server_function_table = &test_options.server_function_table, + }; + + struct aws_mqtt5_client_mock_test_fixture test_context; + ASSERT_SUCCESS(aws_mqtt5_client_mock_test_fixture_init(&test_context, allocator, &test_fixture_options)); + + puback_context.test_fixture = &test_context; + + ASSERT_SUCCESS(aws_mqtt5_client_start(test_context.client)); + aws_wait_for_connected_lifecycle_event(&test_context); + + /* Try to invoke puback with invalid control ID */ + struct aws_mqtt5_manual_puback_completion_options completion_options = { + .completion_callback = s_manual_puback_completion_fn, + .completion_user_data = &puback_context, + }; + + uint64_t invalid_control_id = 999999; + ASSERT_SUCCESS(aws_mqtt5_client_invoke_puback(test_context.client, invalid_control_id, &completion_options)); + + s_wait_for_manual_puback_callback(&puback_context); + + /* Verify invalid result */ + aws_mutex_lock(&test_context.lock); + ASSERT_INT_EQUALS(1, puback_context.puback_result_count); + ASSERT_INT_EQUALS(AWS_MQTT5_MPR_PUBACK_INVALID, puback_context.puback_results[0]); + aws_mutex_unlock(&test_context.lock); + + ASSERT_SUCCESS(aws_mqtt5_client_stop(test_context.client, NULL, NULL)); + aws_wait_for_stopped_lifecycle_event(&test_context); + + aws_mqtt5_client_mock_test_fixture_clean_up(&test_context); + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(mqtt5_client_manual_puback_invalid_control_id, s_mqtt5_client_manual_puback_invalid_control_id_fn) + +/** + * Multiple PUBLISHes with manual PUBACK + * - Server sends 3 QoS1 PUBLISHes + * - Client acquires control for all 3 + * - Client invokes puback for them (in reverse order) + * - Verify all succeed + */ +static int s_mqtt5_client_manual_puback_multiple_publishes_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options test_options; + aws_mqtt5_client_test_init_default_options(&test_options); + + struct aws_mqtt5_manual_puback_test_context puback_context; + s_manual_puback_test_context_init(&puback_context); + + struct aws_mqtt5_server_manual_puback_context server_context = { + .publish_sent = false, + .connack_sent = false, + .publishes_to_send = 3, + }; + + test_options.server_function_table.service_task_fn = s_aws_mqtt5_mock_server_send_qos1_publish; + test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_CONNECT] = + s_aws_mqtt5_server_send_qos1_publish_on_connect; + + struct aws_mqtt5_client_mqtt5_mock_test_fixture_options test_fixture_options = { + .client_options = &test_options.client_options, + .server_function_table = &test_options.server_function_table, + .mock_server_user_data = &server_context, + }; + + struct aws_mqtt5_client_mock_test_fixture test_context; + ASSERT_SUCCESS(aws_mqtt5_client_mock_test_fixture_init(&test_context, allocator, &test_fixture_options)); + + puback_context.test_fixture = &test_context; + server_context.test_fixture = &test_context; + + struct aws_mqtt5_listener_config listener_config = { + .client = test_context.client, + .listener_callbacks = { + .listener_publish_received_handler = s_manual_puback_publish_received_handler, + .listener_publish_received_handler_user_data = &puback_context, + }}; + + struct aws_mqtt5_listener *listener = aws_mqtt5_listener_new(allocator, &listener_config); + + ASSERT_SUCCESS(aws_mqtt5_client_start(test_context.client)); + aws_wait_for_connected_lifecycle_event(&test_context); + + /* Wait for all 3 publishes */ + s_wait_for_n_manual_puback_publishes(&puback_context, 3); + + /* Verify we got 3 control IDs */ + aws_mutex_lock(&test_context.lock); + ASSERT_INT_EQUALS(3, puback_context.puback_control_count); + aws_mutex_unlock(&test_context.lock); + + /* Invoke pubacks in reverse order */ + struct aws_mqtt5_manual_puback_completion_options completion_options = { + .completion_callback = s_manual_puback_completion_fn, + .completion_user_data = &puback_context, + }; + + for (int i = 2; i >= 0; i--) { + ASSERT_SUCCESS(aws_mqtt5_client_invoke_puback( + test_context.client, puback_context.puback_control_ids[i], &completion_options)); + } + + /* Wait for all callbacks */ + s_wait_for_n_manual_puback_callbacks(&puback_context, 3); + + /* Verify all succeeded */ + aws_mutex_lock(&test_context.lock); + ASSERT_INT_EQUALS(3, puback_context.puback_result_count); + for (size_t i = 0; i < 3; i++) { + ASSERT_INT_EQUALS(AWS_MQTT5_MPR_SUCCESS, puback_context.puback_results[i]); + } + aws_mutex_unlock(&test_context.lock); + + aws_mqtt5_listener_release(listener); + ASSERT_SUCCESS(aws_mqtt5_client_stop(test_context.client, NULL, NULL)); + aws_wait_for_stopped_lifecycle_event(&test_context); + + aws_mqtt5_client_mock_test_fixture_clean_up(&test_context); + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(mqtt5_client_manual_puback_multiple_publishes, s_mqtt5_client_manual_puback_multiple_publishes_fn) + +/** + * Disconnect cancels pending manual PUBACKs + * - Server sends QoS1 PUBLISH + * - Client acquires manual puback control + * - Disconnect before invoking puback + * - Invoke puback after disconnect + * - Verify AWS_MQTT5_MPR_PUBACK_CANCELLED result + */ +static int s_mqtt5_client_manual_puback_disconnect_cancellation_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options test_options; + aws_mqtt5_client_test_init_default_options(&test_options); + + struct aws_mqtt5_manual_puback_test_context puback_context; + s_manual_puback_test_context_init(&puback_context); + + struct aws_mqtt5_server_manual_puback_context server_context = { + .publish_sent = false, + .connack_sent = false, + .publishes_to_send = 1, + }; + + test_options.server_function_table.service_task_fn = s_aws_mqtt5_mock_server_send_qos1_publish; + test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_CONNECT] = + s_aws_mqtt5_server_send_qos1_publish_on_connect; + + struct aws_mqtt5_client_mqtt5_mock_test_fixture_options test_fixture_options = { + .client_options = &test_options.client_options, + .server_function_table = &test_options.server_function_table, + .mock_server_user_data = &server_context, + }; + + struct aws_mqtt5_client_mock_test_fixture test_context; + ASSERT_SUCCESS(aws_mqtt5_client_mock_test_fixture_init(&test_context, allocator, &test_fixture_options)); + + puback_context.test_fixture = &test_context; + server_context.test_fixture = &test_context; + + struct aws_mqtt5_listener_config listener_config = { + .client = test_context.client, + .listener_callbacks = { + .listener_publish_received_handler = s_manual_puback_publish_received_handler, + .listener_publish_received_handler_user_data = &puback_context, + }}; + + struct aws_mqtt5_listener *listener = aws_mqtt5_listener_new(allocator, &listener_config); + + ASSERT_SUCCESS(aws_mqtt5_client_start(test_context.client)); + aws_wait_for_connected_lifecycle_event(&test_context); + + s_wait_for_manual_puback_publish(&puback_context); + + aws_mutex_lock(&test_context.lock); + uint64_t control_id = puback_context.puback_control_ids[0]; + aws_mutex_unlock(&test_context.lock); + + /* Disconnect before invoking puback */ + ASSERT_SUCCESS(aws_mqtt5_client_stop(test_context.client, NULL, NULL)); + aws_wait_for_stopped_lifecycle_event(&test_context); + + /* Now try to invoke puback after disconnect */ + struct aws_mqtt5_manual_puback_completion_options completion_options = { + .completion_callback = s_manual_puback_completion_fn, + .completion_user_data = &puback_context, + }; + + ASSERT_SUCCESS(aws_mqtt5_client_invoke_puback(test_context.client, control_id, &completion_options)); + s_wait_for_manual_puback_callback(&puback_context); + + /* Verify cancelled result */ + aws_mutex_lock(&test_context.lock); + ASSERT_INT_EQUALS(1, puback_context.puback_result_count); + ASSERT_INT_EQUALS(AWS_MQTT5_MPR_PUBACK_CANCELLED, puback_context.puback_results[0]); + aws_mutex_unlock(&test_context.lock); + + aws_mqtt5_listener_release(listener); + aws_mqtt5_client_mock_test_fixture_clean_up(&test_context); + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE( + mqtt5_client_manual_puback_disconnect_cancellation, + s_mqtt5_client_manual_puback_disconnect_cancellation_fn) \ No newline at end of file diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/aws-c-mqtt-0.14.0/tests/v5/mqtt5_testing_utils.h new/aws-c-mqtt-0.15.0/tests/v5/mqtt5_testing_utils.h --- old/aws-c-mqtt-0.14.0/tests/v5/mqtt5_testing_utils.h 2026-02-17 18:31:31.000000000 +0100 +++ new/aws-c-mqtt-0.15.0/tests/v5/mqtt5_testing_utils.h 2026-03-11 19:20:15.000000000 +0100 @@ -58,16 +58,54 @@ struct aws_task service_task; }; +/** + * Callback invoked when the mock server receives a packet from the client. + * + * @param packet_view The decoded packet view (type-specific, e.g., aws_mqtt5_packet_connect_view) + * @param connection The mock server connection context + * @param packet_received_user_data User data from the vtable (mock_server_user_data) + * @return AWS_OP_SUCCESS on success, AWS_OP_ERR on failure + */ typedef int(aws_mqtt5_on_mock_server_packet_received_fn)( void *packet_view, struct aws_mqtt5_server_mock_connection_context *connection, void *packet_received_user_data); +/** + * Periodic service callback for the mock MQTT5 server. + * + * This function is called on a periodic timer (every 1 second) and allows the mock server + * to perform proactive operations such as: + * - Sending server-initiated packets (PUBLISH, DISCONNECT, etc.) to the client + * - Simulating network delays or timeouts + * - Checking elapsed time and triggering time-based behaviors + * - Managing connection state transitions + * + * The callback runs on the server's event loop and is automatically scheduled: + * 1. Initially when the connection is established (runs immediately) + * 2. After each execution (reschedules itself to run 1 second later) + * + * The task is automatically cancelled when the connection is destroyed. + * + * @param mock_server The mock server connection context (provides channel, encoder, etc.) + * @param user_data User data from the vtable (mock_server_user_data from test fixture) + * + * @note This callback is optional. Set to NULL if periodic server behavior is not needed. + * @note Use aws_mqtt5_mock_server_send_packet() to send packets to the client. + */ typedef void( aws_mqtt5_mock_server_service_fn)(struct aws_mqtt5_server_mock_connection_context *mock_server, void *user_data); +/** + * Virtual function table for the mock MQTT5 server. + * + * Defines callbacks for handling client packets and performing periodic server operations. + */ struct aws_mqtt5_mock_server_vtable { + /** Handlers for packets received from the client (indexed by aws_mqtt5_packet_type) */ aws_mqtt5_on_mock_server_packet_received_fn *packet_handlers[16]; + + /** Optional periodic service callback (runs every 1 second). Can be NULL. */ aws_mqtt5_mock_server_service_fn *service_task_fn; };
