Repository: kafka Updated Branches: refs/heads/trunk 5b36adde4 -> c69842336
KAFKA-5126: Implement KIP-98 transactional methods in the MockProducer Author: Matthias J. Sax <[email protected]> Reviewers: Damian Guy, Guozhang Wang Closes #2951 from mjsax/kafka-5126-add-transactions-to-mock-producer Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c6984233 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c6984233 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c6984233 Branch: refs/heads/trunk Commit: c69842336d87cc321a58171c517c46cdddfe1a64 Parents: 5b36add Author: Matthias J. Sax <[email protected]> Authored: Tue May 9 10:15:40 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue May 9 10:15:40 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/producer/MockProducer.java | 164 ++++++- .../clients/producer/MockProducerTest.java | 468 ++++++++++++++++++- 2 files changed, 592 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c6984233/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 1b4151c..15ea454 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -51,12 +51,20 @@ public class MockProducer<K, V> implements Producer<K, V> { private final Cluster cluster; private final Partitioner partitioner; private final List<ProducerRecord<K, V>> sent; + private final List<ProducerRecord<K, V>> uncommittedSends; private final Deque<Completion> completions; - private boolean autoComplete; - private Map<TopicPartition, Long> offsets; - private boolean closed; + private final Map<TopicPartition, Long> offsets; + private final List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> consumerGroupOffsets; + private Map<String, Map<TopicPartition, OffsetAndMetadata>> uncommittedConsumerGroupOffsets; private final ExtendedSerializer<K> keySerializer; private final ExtendedSerializer<V> valueSerializer; + private boolean autoComplete; + private boolean closed; + private boolean transactionInitialized; + private boolean transactionInFlight; + private boolean transactionCommitted; + private boolean transactionAborted; + private boolean producerFenced; /** * Create a mock producer @@ -80,9 +88,12 @@ public class MockProducer<K, V> implements Producer<K, V> { this.partitioner = partitioner; this.keySerializer = ensureExtended(keySerializer); this.valueSerializer = ensureExtended(valueSerializer); - this.offsets = new HashMap<TopicPartition, Long>(); - this.sent = new ArrayList<ProducerRecord<K, V>>(); - this.completions = new ArrayDeque<Completion>(); + this.offsets = new HashMap<>(); + this.sent = new ArrayList<>(); + this.uncommittedSends = new ArrayList<>(); + this.consumerGroupOffsets = new ArrayList<>(); + this.uncommittedConsumerGroupOffsets = new HashMap<>(); + this.completions = new ArrayDeque<>(); } /** @@ -117,29 +128,94 @@ public class MockProducer<K, V> implements Producer<K, V> { this(Cluster.empty(), false, null, null, null); } - public void initTransactions() { + private <T> ExtendedSerializer<T> ensureExtended(Serializer<T> serializer) { + return serializer instanceof ExtendedSerializer ? (ExtendedSerializer<T>) serializer : new ExtendedSerializer.Wrapper<>(serializer); + } + @Override + public void initTransactions() { + verifyProducerState(); + if (this.transactionInitialized) { + throw new IllegalStateException("MockProducer has already been initialized for transactions."); + } + this.transactionInitialized = true; } + @Override public void beginTransaction() throws ProducerFencedException { - + verifyProducerState(); + verifyTransactionsInitialized(); + this.transactionInFlight = true; + this.transactionCommitted = false; + this.transactionAborted = false; } + @Override public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, - String consumerGroupId) throws ProducerFencedException { - + String consumerGroupId) throws ProducerFencedException { + verifyProducerState(); + verifyTransactionsInitialized(); + verifyNoTransactionInFlight(); + Map<TopicPartition, OffsetAndMetadata> uncommittedOffsets = this.uncommittedConsumerGroupOffsets.get(consumerGroupId); + if (uncommittedOffsets == null) { + uncommittedOffsets = new HashMap<>(); + this.uncommittedConsumerGroupOffsets.put(consumerGroupId, uncommittedOffsets); + } + uncommittedOffsets.putAll(offsets); } + @Override public void commitTransaction() throws ProducerFencedException { + verifyProducerState(); + verifyTransactionsInitialized(); + verifyNoTransactionInFlight(); + + flush(); + + this.sent.addAll(this.uncommittedSends); + if (!this.uncommittedConsumerGroupOffsets.isEmpty()) + this.consumerGroupOffsets.add(this.uncommittedConsumerGroupOffsets); + + this.uncommittedSends.clear(); + this.uncommittedConsumerGroupOffsets = new HashMap<>(); + this.transactionCommitted = true; + this.transactionAborted = false; + this.transactionInFlight = false; } + @Override public void abortTransaction() throws ProducerFencedException { - + verifyProducerState(); + verifyTransactionsInitialized(); + verifyNoTransactionInFlight(); + flush(); + this.uncommittedSends.clear(); + this.uncommittedConsumerGroupOffsets.clear(); + this.transactionCommitted = false; + this.transactionAborted = true; + this.transactionInFlight = false; } - - private <T> ExtendedSerializer<T> ensureExtended(Serializer<T> serializer) { - return serializer instanceof ExtendedSerializer ? (ExtendedSerializer<T>) serializer : new ExtendedSerializer.Wrapper<>(serializer); + + private void verifyProducerState() { + if (this.closed) { + throw new IllegalStateException("MockProducer is already closed."); + } + if (this.producerFenced) { + throw new ProducerFencedException("MockProducer is fenced."); + } + } + + private void verifyTransactionsInitialized() { + if (!this.transactionInitialized) { + throw new IllegalStateException("MockProducer hasn't been initialized for transactions."); + } + } + + private void verifyNoTransactionInFlight() { + if (!this.transactionInFlight) { + throw new IllegalStateException("There is no open transaction."); + } } /** @@ -159,6 +235,7 @@ public class MockProducer<K, V> implements Producer<K, V> { */ @Override public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { + verifyProducerState(); int partition = 0; if (!this.cluster.partitionsForTopic(record.topic()).isEmpty()) partition = partition(record, this.cluster); @@ -169,11 +246,17 @@ public class MockProducer<K, V> implements Producer<K, V> { Completion completion = new Completion(offset, new RecordMetadata(topicPartition, 0, offset, RecordBatch.NO_TIMESTAMP, 0, 0, 0), result, callback); - this.sent.add(record); + + if (!this.transactionInFlight) + this.sent.add(record); + else + this.uncommittedSends.add(record); + if (autoComplete) completion.complete(null); else this.completions.addLast(completion); + return future; } @@ -212,29 +295,68 @@ public class MockProducer<K, V> implements Producer<K, V> { @Override public void close(long timeout, TimeUnit timeUnit) { - if (closed) { - throw new IllegalStateException("MockedProducer is already closed."); + if (this.closed) { + throw new IllegalStateException("MockProducer is already closed."); } - closed = true; + if (transactionInFlight) + abortTransaction(); + this.closed = true; } public boolean closed() { - return closed; + return this.closed; + } + + public void fenceProducer() { + verifyProducerState(); + verifyTransactionsInitialized(); + this.producerFenced = true; + } + + public boolean transactionInitialized() { + return this.transactionInitialized; + } + + public boolean transactionInFlight() { + return this.transactionInFlight; + } + + public boolean transactionCommitted() { + return this.transactionCommitted; + } + + public boolean transactionAborted() { + return this.transactionAborted; } /** * Get the list of sent records since the last call to {@link #clear()} */ public synchronized List<ProducerRecord<K, V>> history() { - return new ArrayList<ProducerRecord<K, V>>(this.sent); + return new ArrayList<>(this.sent); } /** - * Clear the stored history of sent records + * Get the list of committed consumer group offsets since the last call to {@link #clear()} + */ + public synchronized List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> consumerGroupOffsetsHistory() { + return new ArrayList<>(this.consumerGroupOffsets); + } + /** + * + * Clear the stored history of sent records, consumer group offsets, and transactional state */ public synchronized void clear() { this.sent.clear(); + this.uncommittedSends.clear(); this.completions.clear(); + this.consumerGroupOffsets.clear(); + this.uncommittedConsumerGroupOffsets.clear(); + this.transactionInitialized = false; + this.transactionInFlight = false; + this.transactionCommitted = false; + this.transactionAborted = false; + this.producerFenced = false; } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/c6984233/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index a29b881..468ea49 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -16,41 +16,51 @@ */ package org.apache.kafka.clients.producer; -import static java.util.Arrays.asList; -import static java.util.Collections.singletonList; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.test.MockSerializer; import org.junit.Test; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class MockProducerTest { - private String topic = "topic"; + private final String topic = "topic"; + private final MockProducer<byte[], byte[]> producer = new MockProducer<>(true, new MockSerializer(), new MockSerializer()); + private final ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(topic, "key1".getBytes(), "value1".getBytes()); + private final ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(topic, "key2".getBytes(), "value2".getBytes()); + @Test @SuppressWarnings("unchecked") public void testAutoCompleteMock() throws Exception { - MockProducer<byte[], byte[]> producer = new MockProducer<>(true, new MockSerializer(), new MockSerializer()); - ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, "key".getBytes(), "value".getBytes()); - Future<RecordMetadata> metadata = producer.send(record); + Future<RecordMetadata> metadata = producer.send(record1); assertTrue("Send should be immediately complete", metadata.isDone()); assertFalse("Send should be successful", isError(metadata)); assertEquals("Offset should be 0", 0L, metadata.get().offset()); assertEquals(topic, metadata.get().topic()); - assertEquals("We should have the record in our history", singletonList(record), producer.history()); + assertEquals("We should have the record in our history", singletonList(record1), producer.history()); producer.clear(); assertEquals("Clear should erase our history", 0, producer.history().size()); } @@ -72,8 +82,6 @@ public class MockProducerTest { @Test public void testManualCompletion() throws Exception { MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer()); - ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(topic, "key1".getBytes(), "value1".getBytes()); - ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(topic, "key2".getBytes(), "value2".getBytes()); Future<RecordMetadata> md1 = producer.send(record1); assertFalse("Send shouldn't have completed", md1.isDone()); Future<RecordMetadata> md2 = producer.send(record2); @@ -98,6 +106,428 @@ public class MockProducerTest { assertTrue("Requests should be completed.", md3.isDone() && md4.isDone()); } + @Test + public void shouldInitTransactions() { + producer.initTransactions(); + assertTrue(producer.transactionInitialized()); + } + + @Test + public void shouldThrowOnInitTransactionIfProducerAlreadyInitializedForTransactions() { + producer.initTransactions(); + try { + producer.initTransactions(); + fail("Should have thrown as producer is already initialized"); + } catch (IllegalStateException e) { } + } + + @Test(expected = IllegalStateException.class) + public void shouldThrowOnBeginTransactionIfTransactionsNotInitialized() { + producer.beginTransaction(); + } + + @Test + public void shouldBeginTransactions() { + producer.initTransactions(); + producer.beginTransaction(); + assertTrue(producer.transactionInFlight()); + } + + @Test(expected = IllegalStateException.class) + public void shouldThrowOnSendOffsetsToTransactionIfTransactionsNotInitialized() { + producer.sendOffsetsToTransaction(null, null); + } + + @Test + public void shouldThrowOnSendOffsetsToTransactionTransactionIfNoTransactionGotStarted() { + producer.initTransactions(); + try { + producer.sendOffsetsToTransaction(null, null); + fail("Should have thrown as producer has no open transaction"); + } catch (IllegalStateException e) { } + } + + @Test(expected = IllegalStateException.class) + public void shouldThrowOnCommitIfTransactionsNotInitialized() { + producer.commitTransaction(); + } + + @Test + public void shouldThrowOnCommitTransactionIfNoTransactionGotStarted() { + producer.initTransactions(); + try { + producer.commitTransaction(); + fail("Should have thrown as producer has no open transaction"); + } catch (IllegalStateException e) { } + } + + @Test + public void shouldCommitEmptyTransaction() { + producer.initTransactions(); + producer.beginTransaction(); + producer.commitTransaction(); + assertFalse(producer.transactionInFlight()); + assertTrue(producer.transactionCommitted()); + assertFalse(producer.transactionAborted()); + } + + @Test(expected = IllegalStateException.class) + public void shouldThrowOnAbortIfTransactionsNotInitialized() { + producer.abortTransaction(); + } + + @Test + public void shouldThrowOnAbortTransactionIfNoTransactionGotStarted() { + producer.initTransactions(); + try { + producer.abortTransaction(); + fail("Should have thrown as producer has no open transaction"); + } catch (IllegalStateException e) { } + } + + @Test + public void shouldAbortEmptyTransaction() { + producer.initTransactions(); + producer.beginTransaction(); + producer.abortTransaction(); + assertFalse(producer.transactionInFlight()); + assertTrue(producer.transactionAborted()); + assertFalse(producer.transactionCommitted()); + } + + @Test + public void shouldAbortInFlightTransactionOnClose() { + producer.initTransactions(); + producer.beginTransaction(); + producer.close(); + assertFalse(producer.transactionInFlight()); + assertTrue(producer.transactionAborted()); + assertFalse(producer.transactionCommitted()); + } + + @Test(expected = IllegalStateException.class) + public void shouldThrowFenceProducerIfTransactionsNotInitialized() { + producer.fenceProducer(); + } + + @Test + public void shouldThrowOnBeginTransactionsIfProducerGotFenced() { + producer.initTransactions(); + producer.fenceProducer(); + try { + producer.beginTransaction(); + fail("Should have thrown as producer is fenced off"); + } catch (ProducerFencedException e) { } + } + + @Test + public void shouldThrowOnSendIfProducerGotFenced() { + producer.initTransactions(); + producer.fenceProducer(); + try { + producer.send(null); + fail("Should have thrown as producer is fenced off"); + } catch (ProducerFencedException e) { } + } + + @Test + public void shouldThrowOnSendOffsetsToTransactionIfProducerGotFenced() { + producer.initTransactions(); + producer.fenceProducer(); + try { + producer.sendOffsetsToTransaction(null, null); + fail("Should have thrown as producer is fenced off"); + } catch (ProducerFencedException e) { } + } + + @Test + public void shouldThrowOnCommitTransactionIfProducerGotFenced() { + producer.initTransactions(); + producer.fenceProducer(); + try { + producer.commitTransaction(); + fail("Should have thrown as producer is fenced off"); + } catch (ProducerFencedException e) { } + } + + @Test + public void shouldThrowOnAbortTransactionIfProducerGotFenced() { + producer.initTransactions(); + producer.fenceProducer(); + try { + producer.abortTransaction(); + fail("Should have thrown as producer is fenced off"); + } catch (ProducerFencedException e) { } + } + + @Test + public void shouldPublishMessagesOnlyAfterCommitIfTransactionsAreEnabled() { + producer.initTransactions(); + producer.beginTransaction(); + + producer.send(record1); + producer.send(record2); + + assertTrue(producer.history().isEmpty()); + + producer.commitTransaction(); + + List<ProducerRecord<byte[], byte[]>> expectedResult = new ArrayList<>(); + expectedResult.add(record1); + expectedResult.add(record2); + + assertThat(producer.history(), equalTo(expectedResult)); + } + + @Test + public void shouldFlushOnCommitForNonAutoCompleteIfTransactionsAreEnabled() { + MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer()); + producer.initTransactions(); + producer.beginTransaction(); + + Future<RecordMetadata> md1 = producer.send(record1); + Future<RecordMetadata> md2 = producer.send(record2); + + assertFalse(md1.isDone()); + assertFalse(md2.isDone()); + + producer.commitTransaction(); + + assertTrue(md1.isDone()); + assertTrue(md2.isDone()); + } + + @Test + public void shouldDropMessagesOnAbortIfTransactionsAreEnabled() { + producer.initTransactions(); + + producer.beginTransaction(); + producer.send(record1); + producer.send(record2); + producer.abortTransaction(); + assertTrue(producer.history().isEmpty()); + + producer.beginTransaction(); + producer.commitTransaction(); + assertTrue(producer.history().isEmpty()); + } + + @Test + public void shouldThrowOnAbortForNonAutoCompleteIfTransactionsAreEnabled() throws Exception { + MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer()); + producer.initTransactions(); + producer.beginTransaction(); + + Future<RecordMetadata> md1 = producer.send(record1); + assertFalse(md1.isDone()); + + producer.abortTransaction(); + assertTrue(md1.isDone()); + } + + @Test + public void shouldPreserveCommittedMessagesOnAbortIfTransactionsAreEnabled() { + producer.initTransactions(); + + producer.beginTransaction(); + producer.send(record1); + producer.send(record2); + producer.commitTransaction(); + + producer.beginTransaction(); + producer.abortTransaction(); + + List<ProducerRecord<byte[], byte[]>> expectedResult = new ArrayList<>(); + expectedResult.add(record1); + expectedResult.add(record2); + + assertThat(producer.history(), equalTo(expectedResult)); + } + + @Test + public void shouldPublishConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEnabled() { + producer.initTransactions(); + producer.beginTransaction(); + + String group1 = "g1"; + Map<TopicPartition, OffsetAndMetadata> group1Commit = new HashMap<TopicPartition, OffsetAndMetadata>() { + { + put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null)); + put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L, null)); + } + }; + String group2 = "g2"; + Map<TopicPartition, OffsetAndMetadata> group2Commit = new HashMap<TopicPartition, OffsetAndMetadata>() { + { + put(new TopicPartition(topic, 0), new OffsetAndMetadata(101L, null)); + put(new TopicPartition(topic, 1), new OffsetAndMetadata(21L, null)); + } + }; + producer.sendOffsetsToTransaction(group1Commit, group1); + producer.sendOffsetsToTransaction(group2Commit, group2); + + assertTrue(producer.consumerGroupOffsetsHistory().isEmpty()); + + Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedResult = new HashMap<>(); + expectedResult.put(group1, group1Commit); + expectedResult.put(group2, group2Commit); + + producer.commitTransaction(); + assertThat(producer.consumerGroupOffsetsHistory(), equalTo(Collections.singletonList(expectedResult))); + } + + @Test + public void shouldPublishLatestAndCumulativeConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEnabled() { + producer.initTransactions(); + producer.beginTransaction(); + + String group = "g"; + Map<TopicPartition, OffsetAndMetadata> groupCommit1 = new HashMap<TopicPartition, OffsetAndMetadata>() { + { + put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null)); + put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L, null)); + } + }; + Map<TopicPartition, OffsetAndMetadata> groupCommit2 = new HashMap<TopicPartition, OffsetAndMetadata>() { + { + put(new TopicPartition(topic, 1), new OffsetAndMetadata(101L, null)); + put(new TopicPartition(topic, 2), new OffsetAndMetadata(21L, null)); + } + }; + producer.sendOffsetsToTransaction(groupCommit1, group); + producer.sendOffsetsToTransaction(groupCommit2, group); + + assertTrue(producer.consumerGroupOffsetsHistory().isEmpty()); + + Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedResult = new HashMap<>(); + expectedResult.put(group, new HashMap<TopicPartition, OffsetAndMetadata>() { + { + put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null)); + put(new TopicPartition(topic, 1), new OffsetAndMetadata(101L, null)); + put(new TopicPartition(topic, 2), new OffsetAndMetadata(21L, null)); + } + }); + + producer.commitTransaction(); + assertThat(producer.consumerGroupOffsetsHistory(), equalTo(Collections.singletonList(expectedResult))); + } + + @Test + public void shouldDropConsumerGroupOffsetsOnAbortIfTransactionsAreEnabled() { + producer.initTransactions(); + producer.beginTransaction(); + + String group = "g"; + Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() { + { + put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null)); + put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L, null)); + } + }; + producer.sendOffsetsToTransaction(groupCommit, group); + producer.abortTransaction(); + + producer.beginTransaction(); + producer.commitTransaction(); + assertTrue(producer.consumerGroupOffsetsHistory().isEmpty()); + } + + @Test + public void shouldPreserveCommittedConsumerGroupsOffsetsOnAbortIfTransactionsAreEnabled() { + producer.initTransactions(); + producer.beginTransaction(); + + String group = "g"; + Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() { + { + put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null)); + put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L, null)); + } + }; + producer.sendOffsetsToTransaction(groupCommit, group); + producer.commitTransaction(); + + producer.beginTransaction(); + producer.abortTransaction(); + + Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedResult = new HashMap<>(); + expectedResult.put(group, groupCommit); + + assertThat(producer.consumerGroupOffsetsHistory(), equalTo(Collections.singletonList(expectedResult))); + } + + @Test + public void shouldThrowOnInitTransactionIfProducerIsClosed() { + producer.close(); + try { + producer.initTransactions(); + fail("Should have thrown as producer is already closed"); + } catch (IllegalStateException e) { } + } + + @Test + public void shouldThrowOnSendIfProducerIsClosed() { + producer.close(); + try { + producer.send(null); + fail("Should have thrown as producer is already closed"); + } catch (IllegalStateException e) { } + } + + @Test + public void shouldThrowOnBeginTransactionIfProducerIsClosed() { + producer.close(); + try { + producer.beginTransaction(); + fail("Should have thrown as producer is already closed"); + } catch (IllegalStateException e) { } + } + + @Test + public void shouldThrowSendOffsetsToTransactionIfProducerIsClosed() { + producer.close(); + try { + producer.sendOffsetsToTransaction(null, null); + fail("Should have thrown as producer is already closed"); + } catch (IllegalStateException e) { } + } + + @Test + public void shouldThrowOnCommitTransactionIfProducerIsClosed() { + producer.close(); + try { + producer.commitTransaction(); + fail("Should have thrown as producer is already closed"); + } catch (IllegalStateException e) { } + } + + @Test + public void shouldThrowOnAbortTransactionIfProducerIsClosed() { + producer.close(); + try { + producer.abortTransaction(); + fail("Should have thrown as producer is already closed"); + } catch (IllegalStateException e) { } + } + + @Test + public void shouldThrowOnCloseIfProducerIsClosed() { + producer.close(); + try { + producer.close(); + fail("Should have thrown as producer is already closed"); + } catch (IllegalStateException e) { } + } + + @Test + public void shouldThrowOnFenceProducerIfProducerIsClosed() { + producer.close(); + try { + producer.fenceProducer(); + fail("Should have thrown as producer is already closed"); + } catch (IllegalStateException e) { } + } + private boolean isError(Future<?> future) { try { future.get();
