[ 
https://issues.apache.org/jira/browse/KAFKA-5825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16163954#comment-16163954
 ] 

Guozhang Wang commented on KAFKA-5825:
--------------------------------------

I looked carefully through the logs and here is what I can find:

1) the producer did successfully send the message with the transaction, and the 
coordinator's last state transition is to transit from prepareCommit to 
completeCommit:

{code}
17:44:52.908 [kafka-request-handler-2] DEBUG 
kafka.coordinator.transaction.TransactionMetadata - TransactionalId 
aa39562f-8b63-42be-8aea-61b10d7d56eb prepare transition from PrepareCommit to 
TxnTransitMetadata(producerId=2000, producerEpoch=0, txnTimeoutMs=60000, 
txnState=CompleteCommit, topicPartitions=Set(), 
txnStartTimestamp=1505061892804, txnLastUpdateTimestamp=1505061892908)
{code}

Note that it was long after the starting of the test, because producer was 
keeping trying to find the coordinator in a loop, that is mainly because the 
default num.partitions for transaction logs is 50, which takes a lot of time to 
set up with replication factor of 3.

2) during the mean time, Streams consumer keep fetching data from but cannot 
get any:

{code}
17:44:53.910 [exactly-once-762724d8-71d8-4c1d-a924-970fc5a81e9f-StreamThread-1] 
DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch 
READ_UNCOMMITTED at offset 1 for partition my-topic-0 returned fetch data 
(error=NONE, highWaterMark=1, lastStableOffset = -1, logStartOffset = 0, 
abortedTransactions = null, recordsSizeInBytes=0)
17:44:53.910 [exactly-once-762724d8-71d8-4c1d-a924-970fc5a81e9f-StreamThread-1] 
DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added 
READ_UNCOMMITTED fetch request for partition my-topic-0 at offset 1 to node 
127.0.0.1:51751 (id: 0 rack: null)
17:44:53.910 [exactly-once-762724d8-71d8-4c1d-a924-970fc5a81e9f-StreamThread-1] 
DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending 
READ_UNCOMMITTED fetch for partitions [my-topic-0] to broker 127.0.0.1:51751 
(id: 0 rack: null)
{code}

So I cannot see any obvious correctness issues, and I'd suspect it is because 
the default num.partitions is too large to set up for your test (note that 
without EOS turned on this topic would not be created).

I'd suggest try changing {transaction.state.log.num.partitions} config to 
smaller (3?) number, and also consider reducing replication factor for your 
testing, and see if it works.

> Streams not processing when exactly once is set
> -----------------------------------------------
>
>                 Key: KAFKA-5825
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5825
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.0
>         Environment: EmbeddedKafka running on Windows.  Relevant files 
> attached.
>            Reporter: Ryan Worsley
>         Attachments: build.sbt, log4j.properties, log-output.txt, Tests.scala
>
>
> +Set-up+
> I'm using [EmbeddedKafka|https://github.com/manub/scalatest-embedded-kafka/] 
> for ScalaTest.
> This spins up a single broker internally on a random port.
> I've written two tests - the first without transactions, the second with.  
> They're nearly identical apart from the config and the transactional 
> semantics.  I've written the transactional version based on Neha's 
> [blog|https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/]
>  which is the closest thing I could find to instructions.
> The tests wait until a single message is processed by the streams topology, 
> they use this message to complete a promise that the test is waiting on.  
> Once the promise completes the test verifies the value of the promise as 
> being the expected value of the message.
> +Observed behaviour+
> The first test passes fine, the second test times out, the stream processor 
> never seems to read the transactional message.
> +Notes+
> I've attached my build.sbt, log4j.properties and my Tests.scala file in order 
> to make it as easy as possible for someone to re-create.  I'm running on 
> Windows and using Scala as this reflects my workplace.  I completely expect 
> there to be some configuration issue that's causing this, but am unable to 
> proceed at this time.
> Related information: 
> https://github.com/manub/scalatest-embedded-kafka/issues/82



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to