ramackri opened a new pull request, #1001:
URL: https://github.com/apache/ranger/pull/1001
## What changes were proposed in this pull request?
This change closes Kafka producer and consumer configuration gaps in the
audit-server stack:
- **Ingestor producer** — wire `batch.size`, `linger.ms`, `buffer.memory`,
`compression.type`, timeouts, and related `ProducerConfig` keys from site XML;
`flush()` on shutdown; configurable REST batch send timeout.
- **Dispatchers** — default to `CooperativeStickyAssignor`; document and
ship consumer rebalance/poll timeout properties in Solr and HDFS site XML.
- **Topic creation** — optionally apply `retention.ms`, `compression.type`,
and `min.insync.replicas` when Ranger creates the `ranger_audits` topic.
All new properties are **backward compatible**: code defaults apply when
site XML omits them.
---
## End-to-end flow (Tier 3)
```text
Ranger plugins --REST--> audit-ingestor (:7081) --produce--> Kafka
(ranger_audits)
|
+-------------------------------+-------------------------------+
| |
v v
ranger_audit_solr_dispatcher_group
ranger_audit_hdfs_dispatcher_group
| |
v v
Solr (ranger_audits) HDFS
(/ranger/audit/hdfs/...)
|
v
Ranger Admin → Audit → Access tab
```
---
## Gap analysis (before this change)
### Producer (`AuditProducer.java`)
| Gap | Before | Risk |
|-----|--------|------|
| P1 producer props not wired | `batch.size`, `linger.ms`, `buffer.memory`,
`compression.type`, `delivery.timeout.ms` hard-coded or Kafka defaults (32 KiB
batch, 5 ms linger, no compression) | Lower throughput under burst; higher
broker disk/network; no operator tuning via XML |
| `request.timeout.ms` in XML but not on producer | Property existed in site
XML; only used by AdminClient for topic init | Producer used Kafka default;
inconsistent with documented config |
| `sendBatch` timeout hard-coded | Fixed 30 s latch in code | Not adjustable
for large batches or slow brokers |
| No `flush()` on shutdown | `kafkaProducer.close()` only | In-flight
records may be lost on graceful stop |
| Topic configs at create time | `retention.ms`, `compression.type`,
`min.insync.replicas` not applied by Ranger | Operators had to set topic
configs out-of-band |
### Consumer (Solr / HDFS dispatchers)
| Gap | Before | Risk |
|-----|--------|------|
| Default partition assignor | Kafka client default (`RangeAssignor`) |
Stop-the-world rebalance on pod churn; more partition movement in K8s/Docker |
| Consumer rebalance / poll timeouts | Partially documented; not
consistently in site XML | `max.poll.interval.ms` exceeded when Solr batches
are slow → consumer kicked from group |
| `CooperativeStickyAssignor` not default | Not configured | Incremental
cooperative rebalance unavailable despite Kafka 3.9 client |
---
## Code changes
| File | Change |
|------|--------|
| `audit-common/.../AuditServerConstants.java` | Producer property names and
defaults; topic config property names; default assignor →
`CooperativeStickyAssignor` |
| `audit-ingestor/.../AuditProducer.java` | `createProducerConfig()` wires
all P1/P2 producer props; configurable batch send timeout; `flush()` on
shutdown |
| `audit-common/.../AuditMessageQueueUtils.java` | `buildTopicConfigs()`
applies optional topic configs at create time |
| `ranger-audit-ingestor-site.xml` | Producer tuning block + commented topic
configs for production |
| `ranger-audit-dispatcher-solr-site.xml` | Consumer rebalance/timeout
properties + `CooperativeStickyAssignor` |
| `ranger-audit-dispatcher-hdfs-site.xml` | Consumer rebalance/timeout
properties + `CooperativeStickyAssignor` |
| `AuditProducerTest.java` | Unit tests for `createProducerConfig()` |
| `AuditMessageQueueUtilsTest.java` | Unit tests for `buildTopicConfigs()` |
---
## Configuration reference
### Ingestor producer
**Prefix:** `ranger.audit.ingestor.kafka.producer.*`
**Config file:**
`audit-ingestor/src/main/resources/conf/ranger-audit-ingestor-site.xml`
| Property | Default | Kafka key | Notes |
|----------|---------|-----------|-------|
| `...producer.batch.size` | `131072` (128 KiB) | `batch.size` |
65536–262144 for high volume |
| `...producer.linger.ms` | `20` | `linger.ms` | Coalesce records into
larger batches |
| `...producer.buffer.memory` | `134217728` (128 MiB) | `buffer.memory` |
Avoid `max.block.ms` backpressure |
| `...producer.compression.type` | `lz4` | `compression.type` | ~40–50%
savings on JSON audit payloads |
| `...producer.delivery.timeout.ms` | `120000` | `delivery.timeout.ms` |
Must be ≥ `request.timeout.ms` + linger |
| `...producer.max.request.size` | `1048576` (1 MiB) | `max.request.size` |
Must be ≤ broker `message.max.bytes` |
| `...producer.max.block.ms` | `60000` | `max.block.ms` | Backpressure when
buffer full |
| `...producer.batch.send.timeout.ms` | `30000` | *(application)* | REST
batch callback wait (not a Kafka key) |
| `...kafka.request.timeout.ms` | `60000` | `request.timeout.ms` | Shared by
producer and AdminClient |
Producer also sets `enable.idempotence=true` and `acks=all` in code (not
overridable via these properties).
### Optional topic configs (at create time)
**Prefix:** `ranger.audit.ingestor.kafka.topic.*`
Applied only when set and non-blank. Uncomment in site XML for production
clusters with RF ≥ 3.
| Property | Suggested value | Kafka key |
|----------|-----------------|-----------|
| `...topic.retention.ms` | `604800000` (7 days) | `retention.ms` |
| `...topic.compression.type` | `lz4` | `compression.type` |
| `...topic.min.insync.replicas` | `2` | `min.insync.replicas` |
Requires replication factor ≥ min ISR.
### Dispatcher consumers (Solr and HDFS)
**Prefix:** `ranger.audit.dispatcher.*`
**Config files:**
-
`audit-dispatcher/dispatcher-solr/src/main/resources/conf/ranger-audit-dispatcher-solr-site.xml`
-
`audit-dispatcher/dispatcher-hdfs/src/main/resources/conf/ranger-audit-dispatcher-hdfs-site.xml`
| Property | Default | Notes |
|----------|---------|-------|
| `partition.assignment.strategy` |
`org.apache.kafka.clients.consumer.CooperativeStickyAssignor` | Incremental
cooperative rebalance on pod churn |
| `session.timeout.ms` | `60000` | Must be > 3 × `heartbeat.interval.ms` |
| `heartbeat.interval.ms` | `10000` | Consumer liveness while idle |
| `max.poll.interval.ms` | `300000` (5 min) | Increase if Solr indexing or
HDFS writes exceed this window |
| `max.poll.records` | `500` | Reduce to ~200 if `max.poll.interval.ms` is
exceeded |
---
## Tuning guide
### High audit volume (producer)
| Goal | Action |
|------|--------|
| Higher throughput | Increase `batch.size` (up to 262144) and `linger.ms`
(10–50) |
| Lower broker disk/network | Keep `compression.type=lz4` |
| Avoid producer blocking | Increase `buffer.memory` (64–128 MiB) |
| Slow broker / large batches | Raise `delivery.timeout.ms` and
`batch.send.timeout.ms` together |
### Slow Solr or HDFS batches (consumer)
| Symptom | Action |
|---------|--------|
| Consumer leaves group (`max.poll.interval.ms` exceeded) | Increase
`max.poll.interval.ms` to 1.5–2× p99 batch processing time |
| Frequent rebalance on deploy | Keep `CooperativeStickyAssignor`; expect
one rebalance on first upgrade from `RangeAssignor` |
| Long GC pauses | Reduce `max.poll.records` |
### Production Kafka cluster (RF = 3)
Uncomment topic properties in `ranger-audit-ingestor-site.xml`:
```xml
<property>
<name>ranger.audit.ingestor.kafka.topic.retention.ms</name>
<value>604800000</value>
</property>
<property>
<name>ranger.audit.ingestor.kafka.topic.compression.type</name>
<value>lz4</value>
</property>
<property>
<name>ranger.audit.ingestor.kafka.topic.min.insync.replicas</name>
<value>2</value>
</property>
```
---
## Impact and rollback
| Component | Impact |
|-----------|--------|
| **Audit ingestor** | Higher throughput under burst; lz4 compression;
graceful shutdown flush. Defaults apply when properties omitted. |
| **Solr / HDFS dispatchers** | Cooperative sticky rebalance; fewer
disruptive revokes on container restart. One rebalance possible on first
upgrade. |
| **Topic creation** | Optional retention/compression/min ISR applied only
when properties set. |
| **Rollback** | Restore previous site XML and redeploy audit pods; no
schema migration. |
---
## How was this patch tested?
## Testing
### Unit tests
```bash
mvn test -pl audit-server/audit-common,audit-server/audit-ingestor \
-Dcheckstyle.skip=true -Dpmd.skip=true -Drat.skip=true
```
| Test class | Coverage |
|------------|----------|
| `AuditProducerTest` | `createProducerConfig()` — defaults, overrides,
idempotent/acks wiring |
| `AuditMessageQueueUtilsTest` | `buildTopicConfigs()` — retention,
compression, min ISR |
Compile (without full distro):
```bash
mvn compile -pl
audit-server/audit-common,audit-server/audit-ingestor,audit-server/audit-dispatcher/dispatcher-common
\
-DskipTests -Drat.skip=true
```
### Manual — Docker Tier 3 pipeline
**Environment:** Docker Compose, Kerberos, Tier 3 stack (Admin, Postgres,
Kafka, ZooKeeper, Solr, Hadoop/HDFS plugin, audit ingestor, Solr dispatcher,
HDFS dispatcher).
| Step | Expected result |
|------|-----------------|
| Ingestor health (`:7081`) | UP |
| Solr dispatcher health (`:7091`) | UP |
| HDFS dispatcher health (`:7092`) | UP |
| HDFS access as Kerberos user | Solr `numFound` increases for that user |
| Ranger Admin → Audit → Access | New event visible; matches Solr document |
| HDFS audit path | Files under `/ranger/audit/hdfs/YYYYMMDD/` |
| Kafka broker restart | Pipeline blocks during outage; recovers after
broker is healthy |
## Build and deploy notes
Tarballs are produced by the audit-server assembly modules. A full `mvn
package -pl distro` may fail on unrelated modules (for example Kylin); build
audit-server modules directly:
```bash
mvn package -pl
audit-server/audit-ingestor,audit-server/audit-dispatcher/dispatcher-solr,audit-server/audit-dispatcher/dispatcher-hdfs
\
-am -DskipTests -Drat.skip=true
```
After changing site XML, redeploy the affected audit pod (ingestor and/or
dispatchers) for properties to take effect. Producer and consumer client
properties are read at JVM startup.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]