[
https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Greg Harris reassigned KAFKA-12468:
-----------------------------------
Assignee: Greg Harris
> Initial offsets are copied from source to target cluster
> --------------------------------------------------------
>
> Key: KAFKA-12468
> URL: https://issues.apache.org/jira/browse/KAFKA-12468
> Project: Kafka
> Issue Type: Bug
> Components: mirrormaker
> Affects Versions: 2.7.0
> Reporter: Bart De Neuter
> Assignee: Greg Harris
> Priority: Major
>
> We have an active-passive setup where the 3 connectors from mirror maker 2
> (heartbeat, checkpoint and source) are running on a dedicated Kafka connect
> cluster on the target cluster.
> Offset syncing is enabled as specified by KIP-545. But when activated, it
> seems the offsets from the source cluster are initially copied to the target
> cluster without translation. This causes a negative lag for all synced
> consumer groups. Only when we reset the offsets for each topic/partition on
> the target cluster and produce a record on the topic/partition in the source,
> the sync starts working correctly.
> I would expect that the consumer groups are synced but that the current
> offsets of the source cluster are not copied to the target cluster.
> This is the configuration we are currently using:
> Heartbeat connector
>
> {code:xml}
> {
> "name": "mm2-mirror-heartbeat",
> "config": {
> "name": "mm2-mirror-heartbeat",
> "connector.class":
> "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
> "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "1",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": "
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter":
> "org.apache.kafka.connect.converters.ByteArrayConverter"
> }
> }
> {code}
> Checkpoint connector:
> {code:xml}
> {
> "name": "mm2-mirror-checkpoint",
> "config": {
> "name": "mm2-mirror-checkpoint",
> "connector.class":
> "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
> "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "40",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": "
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter":
> "org.apache.kafka.connect.converters.ByteArrayConverter"
> }
> }
> {code}
> Source connector:
> {code:xml}
> {
> "name": "mm2-mirror-source",
> "config": {
> "name": "mm2-mirror-source",
> "connector.class":
> "org.apache.kafka.connect.mirror.MirrorSourceConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
> "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "40",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": "
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter":
> "org.apache.kafka.connect.converters.ByteArrayConverter"
> }
> }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)