j-riebe opened a new issue, #16689:
URL: https://github.com/apache/druid/issues/16689
### Affected Version
Reproduced with:
* v29.0.1
* v30.0.0
### Description
We observed that the `KafkaSupervisor` won't enter idle mode after
processing all messages even though no new messages are produced in
`idleAfterMillis` duration.
We traced this problem down to cases were the last message has been produced
inside a kafka transaction.
In this case, the offset lag reported in the supervisor status only reduces
to a minium of 1 instead of 0 when all messages have been processed.
This is caused by `currentOffsets` being less than `latestOffsets` in the
supervisor status.
Our assumption is that the offset lag is wrongly computed because the
`latestOffsets` might point to kafka transaction markers (more on that in
section Mutual error cause).
**Note:** This issue **only appears** when the **last message** has been
written as part of a **kafka transaction**. The `lastestOffsets` will point to
**the offset after** the last written (possibly aborted) message which
corresponds to the transaction marker.
When producing a non-transactional message after an arbitrary amount of
transactional messages the lag is correctly calculated as 0 and idle mode is
entered.
### Minimal example
_The following is tested against a single topic & single partition
configuration.
See last section for a full reproduction of this behaviour._
Consider we insert only one single message into a kafka topic as part of a
kafka transaction.
This message will have the offset 0.
As the kafka supervisor reports the next offset to be processed we would
expect the supervisor to report `currentOffsets == latestOffsets == 1`.
But the status is actually reported as:
* `currentOffsets` (actually next to-be-processed offset): 1
* `latestOffsets` (expected next to-be-processed offset): 2
This leads to a lag of 1 which should indicate that there is 1 message to
consume.
But as there are no more messages this is unexpected behaviour.
<details><summary>Example status after single message with offset 0</summary>
<p>
```json
{
"dataSource": "test-idle-mode",
"stream": "test-idle-mode",
"partitions": 1,
"replicas": 1,
"durationSeconds": 3600,
"activeTasks": [
{
"id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih",
"startingOffsets": {
"0": 0
},
"startTime": "2024-07-03T15:04:34.022Z",
"remainingSeconds": 3568,
"type": "ACTIVE",
"currentOffsets": {
"0": 1
},
"lag": {
"0": 1
}
}
],
"publishingTasks": [],
"latestOffsets": {
"0": 2
},
"minimumLag": {
"0": 1
},
"aggregateLag": 1,
"offsetsLastUpdated": "2024-07-03T15:05:02.179Z",
"suspended": false,
"healthy": true,
"state": "RUNNING",
"detailedState": "RUNNING",
"recentErrors": []
}
```
</p>
</details>
We further found out that it is possible to further increment the
`latestOffsets` when producing messages in an aborted transaction.
The `currentOffsets` are not changed by this as they seems to correctly
report the offset of the last processed message +1.
But `latestOffsets` is now reported as 4, leading to a lag of 3 but there
are still no new messages to process.
<details><summary>Updated status after producing another message and
aborting transaction</summary>
<p>
```json
{
"dataSource": "test-idle-mode",
"stream": "test-idle-mode",
"partitions": 1,
"replicas": 1,
"durationSeconds": 3600,
"activeTasks": [
{
"id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih",
"startingOffsets": {
"0": 0
},
"startTime": "2024-07-03T15:04:34.022Z",
"remainingSeconds": 1607,
"type": "ACTIVE",
"currentOffsets": {
"0": 1
},
"lag": {
"0": 3
}
}
],
"publishingTasks": [],
"latestOffsets": {
"0": 4
},
"minimumLag": {
"0": 3
},
"aggregateLag": 3,
"offsetsLastUpdated": "2024-07-03T15:37:32.514Z",
"suspended": false,
"healthy": true,
"state": "RUNNING",
"detailedState": "RUNNING",
"recentErrors": []
}
```
</p>
</details>
We can **rule out issues with the `isolation.level`** because the
`latestOffsets` only get updated once the transaction is either committed or
aborted. So the process seems to adhere to `isolation.level = read_committed`.
By producing a new message without a transaction it is now possible to
reduce the lag to 0.
This will result in `currentOffsets == latestOffsets == 5` (after processing
just **2** valid messages!).
<details><summary>Status after producing non-transactional message</summary>
<p>
```json
{
"dataSource": "test-idle-mode",
"stream": "test-idle-mode",
"partitions": 1,
"replicas": 1,
"durationSeconds": 3600,
"activeTasks": [
{
"id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih",
"startingOffsets": {
"0": 0
},
"startTime": "2024-07-03T15:04:34.022Z",
"remainingSeconds": 917,
"type": "ACTIVE",
"currentOffsets": {
"0": 5
},
"lag": {
"0": 0
}
}
],
"publishingTasks": [],
"latestOffsets": {
"0": 5
},
"minimumLag": {
"0": 0
},
"aggregateLag": 0,
"offsetsLastUpdated": "2024-07-03T15:49:02.588Z",
"suspended": false,
"healthy": true,
"state": "RUNNING",
"detailedState": "RUNNING",
"recentErrors": []
}
```
</p>
</details>
### Mutual error cause
Based on the sudden jumps in the offset we repeated the experiment consuming
the topic in `isolation.level = read_uncommitted` mode in a separate process.
This results in the following offsets after the respective produce calls:
Produce call Offset
(1) first message (transaction committed) [0]
(2) failed message (transaction aborted) [2]
(3) non-transactional message (no transaction) [4]
This raises the question what happened with the offsets 1 and 3 which is
where the Kafka transaction comes in.
These offsets are used by kafka for the **transaction markers** which signal
if the transaction was committed or aborted.
These messages are not intended to be processed by the consumer as a usual
message.
So actually the topic looks like this:
Produce call Offset Relevant
position for current/lastestOffsets @ produce call
(1) first message (transaction committed) [0] currentOffset
(1, 2)
(1) [TRANSACTION COMMITTED] [1] latestOffset
(1)
(2) failed message (transaction aborted) [2]
(2) [TRANSACTION ABORTED] [3] latestOffset
(2)
(3) non-transactional message (no transaction) [4] currentOffset
(3), lastOffset (3)
Unfortunately the `KafkaSupervisor` seems to use the offsets of the
transaction markers to determine the `latestOffsets`.
**The transaction marker offsets will never be processed by the ingestion
task and can therefore never be considered for the `currentOffsets`**.
This is why we are left with a partition lag > 0 even though there aren't
any messages left to process.
This makes it impossible for the current implementation of the idle mode to
detect that there are no more messages to process (if the `lastestOffsets`
point to a transaction marker). The supervisor will just keep running and
spawning tasks forever.
### Possible solution
The method for determining `latestOffsets` needs to be updated so that it
determines the latest available offset that can be consumed by the indexing
task (especially ignoring transaction markers).
Unfortunately I was not able to identify the exact code sections that would
need to be adapted.
But it will very likely have something to do with
`KafkaRecordSupplier.seekToLatest`
([src](https://github.com/apache/druid/blob/586c713d12964ebe69ce222f9c5ebcc8e8c28403/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java#L135-L142))
or [where it is called by the
supervisor](https://github.com/apache/druid/blob/f290cf083a5ab045ec1a780671f7f58a5dea5652/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java#L412).
### Reproduction
_Code examples (python) have authentication details redacted. Adapt the
snippets according to your setup._
1. Create an empty kafka topic `test-idle-mode`
2. Insert single message in transaction -> it will have offset 0
<details><summary>[`Python`] Produce message in transaction</summary>
<p>
```python
# Produce one message in a transaction
import json
from confluent_kafka import Producer
producer = Producer({
"bootstrap.servers": "<redacted>",
..., # auth redacted
"transactional.id": "idletest"
})
producer.init_transactions()
producer.begin_transaction()
producer.produce(
topic="test-idle-mode",
key="",
value=json.dumps(
{"__time": 1577833200000, "value": 1}
),
on_delivery=print,
)
producer.commit_transaction()
producer.flush()
producer.poll(0)
```
</p>
</details>
3. Create a KafkaSupervisor based on the topic `test-idle-mode`
<details><summary>Ingestion spec</summary>
<p>
```json
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "test-idle-mode",
"timestampSpec": {
"column": "__time",
"format": "millis",
"missingValue": null
},
"dimensionsSpec": {
"dimensions": [
{
"type": "long",
"name": "value",
"multiValueHandling": "SORTED_ARRAY",
"createBitmapIndex": false
}
],
"dimensionExclusions": [
"__time"
],
"includeAllDimensions": false,
"useSchemaDiscovery": false
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": {
"type": "none"
},
"rollup": false,
"intervals": []
},
"transformSpec": {
"filter": null,
"transforms": []
}
},
"ioConfig": {
"topic": "test-idle-mode",
"topicPattern": null,
"inputFormat": {
"type": "json",
"keepNullColumns": false,
"assumeNewlineDelimited": false,
"useJsonNodeReader": false
},
"replicas": 1,
"taskCount": 1,
"taskDuration": "PT3600S",
"consumerProperties": {
"bootstrap.servers": <redacted>,
"druid.dynamic.config.provider": {
"type": "environment",
"variables": {
<auth - redacted>
}
},
"idleConfig": {
"enabled": true,
"inactiveAfterMillis": 60000
}
},
"autoScalerConfig": null,
"pollTimeout": 100,
"startDelay": "PT5S",
"period": "PT30S",
"useEarliestOffset": true,
"completionTimeout": "PT1800S",
"lateMessageRejectionPeriod": null,
"earlyMessageRejectionPeriod": null,
"lateMessageRejectionStartDateTime": null,
"configOverrides": null,
"idleConfig": null,
"stopTaskCount": null,
"stream": "test-idle-mode",
"useEarliestSequenceNumber": true
},
"tuningConfig": {
"type": "kafka",
"appendableIndexSpec": {
"type": "onheap",
"preserveExistingMetrics": false
},
"maxRowsInMemory": 150000,
"maxBytesInMemory": 0,
"skipBytesInMemoryOverheadCheck": false,
"maxRowsPerSegment": 5000000,
"maxTotalRows": null,
"intermediatePersistPeriod": "PT10M",
"maxPendingPersists": 0,
"indexSpec": {
"bitmap": {
"type": "roaring"
},
"dimensionCompression": "lz4",
"stringDictionaryEncoding": {
"type": "utf8"
},
"metricCompression": "lz4",
"longEncoding": "longs"
},
"indexSpecForIntermediatePersists": {
"bitmap": {
"type": "roaring"
},
"dimensionCompression": "lz4",
"stringDictionaryEncoding": {
"type": "utf8"
},
"metricCompression": "lz4",
"longEncoding": "longs"
},
"reportParseExceptions": false,
"handoffConditionTimeout": 900000,
"resetOffsetAutomatically": false,
"segmentWriteOutMediumFactory": null,
"workerThreads": null,
"chatRetries": 8,
"httpTimeout": "PT10S",
"shutdownTimeout": "PT80S",
"offsetFetchPeriod": "PT30S",
"intermediateHandoffPeriod": "P2147483647D",
"logParseExceptions": false,
"maxParseExceptions": 2147483647,
"maxSavedParseExceptions": 0,
"numPersistThreads": 1,
"skipSequenceNumberAvailabilityCheck": false,
"repartitionTransitionDuration": "PT120S"
}
},
"context": null,
"suspended": false
}
```
</p>
</details>
4. When checking the supervisor after the task started, the
`currentOffsets` are 1 (last message offset 0 + 1 = 1)
but `latestOffsets` is 2 instead of 1.
<details><summary>[`Status`] Example status after single message with
offset 0</summary>
<p>
This is the same as in section "Example".
```json
{
"dataSource": "test-idle-mode",
"stream": "test-idle-mode",
"partitions": 1,
"replicas": 1,
"durationSeconds": 3600,
"activeTasks": [
{
"id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih",
"startingOffsets": {
"0": 0
},
"startTime": "2024-07-03T15:04:34.022Z",
"remainingSeconds": 3568,
"type": "ACTIVE",
"currentOffsets": {
"0": 1
},
"lag": {
"0": 1
}
}
],
"publishingTasks": [],
"latestOffsets": {
"0": 2
},
"minimumLag": {
"0": 1
},
"aggregateLag": 1,
"offsetsLastUpdated": "2024-07-03T15:05:02.179Z",
"suspended": false,
"healthy": true,
"state": "RUNNING",
"detailedState": "RUNNING",
"recentErrors": []
}
```
</p>
</details>
5. Insert single message in transaction but abort it -> unavailable message
will have offset 2
<details><summary>[`Python`] Produce message and abort
transaction</summary>
<p>
```python
# Produce one message in a transaction
import json
from confluent_kafka import Producer
producer = Producer({
"bootstrap.servers": "<redacted>",
..., # auth redacted
"transactional.id": "idletest"
})
producer.init_transactions()
producer.begin_transaction()
producer.produce(
topic="test-idle-mode",
key="",
value=json.dumps(
{"__time": 1577833201000, "value": 1}
),
on_delivery=print,
)
# Make sure message is sent to broker before aborting the transaction
producer.flush()
producer.poll(0)
producer.abort_transaction()
producer.flush()
producer.poll(0)
```
</p>
</details>
6. The supervisor status still reports `currentOffsets == 1` but
`latestOffsets` is now 4.
<details><summary>[`Status`] Updated status after producing another
message and aborting transaction</summary>
<p>
```json
{
"dataSource": "test-idle-mode",
"stream": "test-idle-mode",
"partitions": 1,
"replicas": 1,
"durationSeconds": 3600,
"activeTasks": [
{
"id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih",
"startingOffsets": {
"0": 0
},
"startTime": "2024-07-03T15:04:34.022Z",
"remainingSeconds": 1607,
"type": "ACTIVE",
"currentOffsets": {
"0": 1
},
"lag": {
"0": 3
}
}
],
"publishingTasks": [],
"latestOffsets": {
"0": 4
},
"minimumLag": {
"0": 3
},
"aggregateLag": 3,
"offsetsLastUpdated": "2024-07-03T15:37:32.514Z",
"suspended": false,
"healthy": true,
"state": "RUNNING",
"detailedState": "RUNNING",
"recentErrors": []
}
```
</p>
</details>
7. Produce a non-transactional message (will have offset 4)
<details><summary>[`Python`] Produce message without
transaction</summary>
<p>
```python
# Produce one message without transaction
import json
from confluent_kafka import Producer
producer = Producer({
"bootstrap.servers": "<redacted>",
..., # auth redacted
})
producer.produce(
topic="test-idle-mode",
key="",
value=json.dumps(
{"__time": 1577833202000, "value": 1}
),
on_delivery=print,
)
producer.flush()
producer.poll(0)
```
</p>
</details>
8. New supervisor status with `currentOffsets == latestOffsets == 5` (now
finally able to enter idle mode)
<details><summary>[`Status`] Status after producing non-transactional
message</summary>
<p>
```json
{
"dataSource": "test-idle-mode",
"stream": "test-idle-mode",
"partitions": 1,
"replicas": 1,
"durationSeconds": 3600,
"activeTasks": [
{
"id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih",
"startingOffsets": {
"0": 0
},
"startTime": "2024-07-03T15:04:34.022Z",
"remainingSeconds": 917,
"type": "ACTIVE",
"currentOffsets": {
"0": 5
},
"lag": {
"0": 0
}
}
],
"publishingTasks": [],
"latestOffsets": {
"0": 5
},
"minimumLag": {
"0": 0
},
"aggregateLag": 0,
"offsetsLastUpdated": "2024-07-03T15:49:02.588Z",
"suspended": false,
"healthy": true,
"state": "RUNNING",
"detailedState": "RUNNING",
"recentErrors": []
}
```
</p>
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]