http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index cc30f4d..719efe9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricConfig; @@ -231,20 +232,8 @@ public class SenderTest { int maxRetries = 1; Metrics m = new Metrics(); try { - Sender sender = new Sender(client, - metadata, - this.accumulator, - false, - MAX_REQUEST_SIZE, - ACKS_ALL, - maxRetries, - m, - time, - REQUEST_TIMEOUT, - 50, - null, - apiVersions - ); + Sender sender = new Sender(client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, + maxRetries, m, time, REQUEST_TIMEOUT, 50, null, apiVersions); // do a successful retry Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // connect @@ -290,20 +279,8 @@ public class SenderTest { int maxRetries = 1; Metrics m = new Metrics(); try { - Sender sender = new Sender(client, - metadata, - this.accumulator, - true, - MAX_REQUEST_SIZE, - ACKS_ALL, - maxRetries, - m, - time, - REQUEST_TIMEOUT, - 50, - null, - apiVersions - ); + Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, + m, time, REQUEST_TIMEOUT, 50, null, apiVersions); // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 Cluster cluster1 = TestUtils.clusterWith(2, "test", 2); metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds()); @@ -375,21 +352,63 @@ public class SenderTest { } @Test - public void testInitPidRequest() throws Exception { + public void testInitProducerIdRequest() throws Exception { final long producerId = 343434L; TransactionManager transactionManager = new TransactionManager(); setupWithTransactionState(transactionManager); client.setNode(new Node(1, "localhost", 33343)); + prepareAndReceiveInitProducerId(producerId, Errors.NONE); + assertTrue(transactionManager.hasProducerId()); + assertEquals(producerId, transactionManager.producerIdAndEpoch().producerId); + assertEquals((short) 0, transactionManager.producerIdAndEpoch().epoch); + } + + @Test + public void testClusterAuthorizationExceptionInInitProducerIdRequest() throws Exception { + final long producerId = 343434L; + TransactionManager transactionManager = new TransactionManager(); + setupWithTransactionState(transactionManager); + client.setNode(new Node(1, "localhost", 33343)); + prepareAndReceiveInitProducerId(producerId, Errors.CLUSTER_AUTHORIZATION_FAILED); + assertFalse(transactionManager.hasProducerId()); + assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.lastError() instanceof ClusterAuthorizationException); + + // cluster authorization is a fatal error for the producer + assertSendFailure(ClusterAuthorizationException.class); + } + + @Test + public void testClusterAuthorizationExceptionInProduceRequest() throws Exception { + final long producerId = 343434L; + TransactionManager transactionManager = new TransactionManager(); + setupWithTransactionState(transactionManager); + + client.setNode(new Node(1, "localhost", 33343)); + prepareAndReceiveInitProducerId(producerId, Errors.NONE); + assertTrue(transactionManager.hasProducerId()); + + // cluster authorization is a fatal error for the producer + Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), + null, null, MAX_BLOCK_TIMEOUT).future; client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { - return body instanceof InitProducerIdRequest; + return body instanceof ProduceRequest && ((ProduceRequest) body).isIdempotent(); } - }, new InitProducerIdResponse(0, Errors.NONE, producerId, (short) 0)); + }, produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0)); + sender.run(time.milliseconds()); - assertTrue(transactionManager.hasProducerId()); - assertEquals(producerId, transactionManager.producerIdAndEpoch().producerId); - assertEquals((short) 0, transactionManager.producerIdAndEpoch().epoch); + assertTrue(future.isDone()); + try { + future.get(); + fail("Future should have raised ClusterAuthorizationException"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof ClusterAuthorizationException); + } + + // cluster authorization is a fatal error for the producer + assertSendFailure(ClusterAuthorizationException.class); } @Test @@ -402,20 +421,8 @@ public class SenderTest { int maxRetries = 10; Metrics m = new Metrics(); - Sender sender = new Sender(client, - metadata, - this.accumulator, - true, - MAX_REQUEST_SIZE, - ACKS_ALL, - maxRetries, - m, - time, - REQUEST_TIMEOUT, - 50, - transactionManager, - apiVersions - ); + Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, + m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; client.prepareResponse(new MockClient.RequestMatcher() { @@ -446,7 +453,7 @@ public class SenderTest { } @Test - public void testAbortRetryWhenPidChanges() throws InterruptedException { + public void testAbortRetryWhenProducerIdChanges() throws InterruptedException { final long producerId = 343434L; TransactionManager transactionManager = new TransactionManager(); transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0)); @@ -455,20 +462,8 @@ public class SenderTest { int maxRetries = 10; Metrics m = new Metrics(); - Sender sender = new Sender(client, - metadata, - this.accumulator, - true, - MAX_REQUEST_SIZE, - ACKS_ALL, - maxRetries, - m, - time, - REQUEST_TIMEOUT, - 50, - transactionManager, - apiVersions - ); + Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, + m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // connect. @@ -504,20 +499,8 @@ public class SenderTest { int maxRetries = 10; Metrics m = new Metrics(); - Sender sender = new Sender(client, - metadata, - this.accumulator, - true, - MAX_REQUEST_SIZE, - ACKS_ALL, - maxRetries, - m, - time, - REQUEST_TIMEOUT, - 50, - transactionManager, - apiVersions - ); + Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, + m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // connect. @@ -642,20 +625,38 @@ public class SenderTest { metricTags.put("client-id", CLIENT_ID); MetricConfig metricConfig = new MetricConfig().tags(metricTags); this.metrics = new Metrics(metricConfig, time); - this.accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager); - this.sender = new Sender(this.client, - this.metadata, - this.accumulator, - true, - MAX_REQUEST_SIZE, - ACKS_ALL, - MAX_RETRIES, - this.metrics, - this.time, - REQUEST_TIMEOUT, - 50, - transactionManager, - apiVersions); + this.accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, + apiVersions, transactionManager); + this.sender = new Sender(this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, + MAX_RETRIES, this.metrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds()); } + + private void assertSendFailure(Class<? extends RuntimeException> expectedError) throws Exception { + Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), + null, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); + assertTrue(future.isDone()); + try { + future.get(); + fail("Future should have raised " + expectedError.getSimpleName()); + } catch (ExecutionException e) { + assertTrue(expectedError.isAssignableFrom(e.getCause().getClass())); + } + } + + private void prepareAndReceiveInitProducerId(long producerId, Errors error) { + short producerEpoch = 0; + if (error != Errors.NONE) + producerEpoch = RecordBatch.NO_PRODUCER_EPOCH; + + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + return body instanceof InitProducerIdRequest && ((InitProducerIdRequest) body).transactionalId() == null; + } + }, new InitProducerIdResponse(0, error, producerId, producerEpoch)); + sender.run(time.milliseconds()); + } + }
http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index fcf0488..e9363d0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -20,12 +20,14 @@ import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -43,6 +45,7 @@ import org.apache.kafka.common.requests.AddPartitionsToTxnResponse; import org.apache.kafka.common.requests.EndTxnRequest; import org.apache.kafka.common.requests.EndTxnResponse; import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.InitProducerIdResponse; @@ -65,6 +68,8 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonMap; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -118,6 +123,7 @@ public class TransactionManagerTest { transactionManager, apiVersions); this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds()); + client.setNode(brokerNode); } @Test(expected = IllegalStateException.class) @@ -134,7 +140,6 @@ public class TransactionManagerTest { assertEquals((int) transactionManager.sequenceNumber(tp0), 3); } - @Test public void testProducerIdReset() { TransactionManager transactionManager = new TransactionManager(); @@ -147,23 +152,13 @@ public class TransactionManagerTest { @Test public void testBasicTransaction() throws InterruptedException { - client.setNode(brokerNode); // This is called from the initTransactions method in the producer as the first order of business. // It finds the coordinator and then gets a PID. final long pid = 13131L; final short epoch = 1; - transactionManager.initializeTransactions(); - prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); - sender.run(time.milliseconds()); // find coordinator - sender.run(time.milliseconds()); - assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION)); - - prepareInitPidResponse(Errors.NONE, false, pid, epoch); + doInitTransactions(pid, epoch); - sender.run(time.milliseconds()); // get pid. - - assertTrue(transactionManager.hasProducerId()); transactionManager.beginTransaction(); transactionManager.maybeAddPartitionToTransaction(tp0); @@ -199,13 +194,13 @@ public class TransactionManagerTest { Map<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<>(); txnOffsetCommitResponse.put(tp1, Errors.NONE); - prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, consumerGroupId); + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId); prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, txnOffsetCommitResponse); - assertEquals(null, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP)); + assertEquals(null, transactionManager.coordinator(CoordinatorType.GROUP)); sender.run(time.milliseconds()); // try to send TxnOffsetCommitRequest, but find we don't have a group coordinator. sender.run(time.milliseconds()); // send find coordinator for group request - assertNotNull(transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP)); + assertNotNull(transactionManager.coordinator(CoordinatorType.GROUP)); assertTrue(transactionManager.hasPendingOffsetCommits()); sender.run(time.milliseconds()); // send TxnOffsetCommitRequest commit. @@ -224,42 +219,40 @@ public class TransactionManagerTest { @Test public void testDisconnectAndRetry() { - client.setNode(brokerNode); // This is called from the initTransactions method in the producer as the first order of business. // It finds the coordinator and then gets a PID. transactionManager.initializeTransactions(); - prepareFindCoordinatorResponse(Errors.NONE, true, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); + prepareFindCoordinatorResponse(Errors.NONE, true, CoordinatorType.TRANSACTION, transactionalId); sender.run(time.milliseconds()); // find coordinator, connection lost. - prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); sender.run(time.milliseconds()); // find coordinator sender.run(time.milliseconds()); - assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION)); + assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); } @Test public void testCoordinatorLost() { - client.setNode(brokerNode); // This is called from the initTransactions method in the producer as the first order of business. // It finds the coordinator and then gets a PID. final long pid = 13131L; final short epoch = 1; TransactionalRequestResult initPidResult = transactionManager.initializeTransactions(); - prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); sender.run(time.milliseconds()); // find coordinator sender.run(time.milliseconds()); - assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION)); + assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); prepareInitPidResponse(Errors.NOT_COORDINATOR, false, pid, epoch); sender.run(time.milliseconds()); // send pid, get not coordinator. Should resend the FindCoordinator and InitPid requests - assertEquals(null, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION)); + assertEquals(null, transactionManager.coordinator(CoordinatorType.TRANSACTION)); assertFalse(initPidResult.isCompleted()); assertFalse(transactionManager.hasProducerId()); - prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); sender.run(time.milliseconds()); - assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION)); + assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); assertFalse(initPidResult.isCompleted()); prepareInitPidResponse(Errors.NONE, false, pid, epoch); sender.run(time.milliseconds()); // get pid and epoch @@ -271,24 +264,216 @@ public class TransactionManagerTest { } @Test - public void testFlushPendingPartitionsOnCommit() throws InterruptedException { - client.setNode(brokerNode); - // This is called from the initTransactions method in the producer as the first order of business. - // It finds the coordinator and then gets a PID. + public void testTransactionalIdAuthorizationFailureInFindCoordinator() { + TransactionalRequestResult initPidResult = transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, + CoordinatorType.TRANSACTION, transactionalId); + sender.run(time.milliseconds()); // find coordinator + sender.run(time.milliseconds()); + + assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException); + + sender.run(time.milliseconds()); // one more run to fail the InitProducerId future + assertTrue(initPidResult.isCompleted()); + assertFalse(initPidResult.isSuccessful()); + assertTrue(initPidResult.error() instanceof TransactionalIdAuthorizationException); + + assertFatalError(TransactionalIdAuthorizationException.class); + } + + @Test + public void testTransactionalIdAuthorizationFailureInInitProducerId() { + final long pid = 13131L; + TransactionalRequestResult initPidResult = transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); + sender.run(time.milliseconds()); // find coordinator + sender.run(time.milliseconds()); + assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + + prepareInitPidResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, pid, RecordBatch.NO_PRODUCER_EPOCH); + sender.run(time.milliseconds()); + + assertTrue(transactionManager.isInErrorState()); + assertTrue(initPidResult.isCompleted()); + assertFalse(initPidResult.isSuccessful()); + assertTrue(initPidResult.error() instanceof TransactionalIdAuthorizationException); + + assertFatalError(TransactionalIdAuthorizationException.class); + } + + @Test + public void testGroupAuthorizationFailureInFindCoordinator() { + final String consumerGroupId = "consumer"; final long pid = 13131L; final short epoch = 1; - transactionManager.initializeTransactions(); - prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); - sender.run(time.milliseconds()); // find coordinator + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction( + singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(39L)), consumerGroupId); + + prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch); + sender.run(time.milliseconds()); // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued + sender.run(time.milliseconds()); // FindCoordinator Enqueued + + prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, false, CoordinatorType.GROUP, consumerGroupId); + sender.run(time.milliseconds()); // FindCoordinator Failed + sender.run(time.milliseconds()); // TxnOffsetCommit Aborted + assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.lastError() instanceof GroupAuthorizationException); + assertTrue(sendOffsetsResult.isCompleted()); + assertFalse(sendOffsetsResult.isSuccessful()); + assertTrue(sendOffsetsResult.error() instanceof GroupAuthorizationException); + + GroupAuthorizationException exception = (GroupAuthorizationException) sendOffsetsResult.error(); + assertEquals(consumerGroupId, exception.groupId()); + + assertAbortableError(GroupAuthorizationException.class); + } + + @Test + public void testGroupAuthorizationFailureInTxnOffsetCommit() { + final String consumerGroupId = "consumer"; + final long pid = 13131L; + final short epoch = 1; + final TopicPartition tp = new TopicPartition("foo", 0); + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction( + singletonMap(tp, new OffsetAndMetadata(39L)), consumerGroupId); + + prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch); + sender.run(time.milliseconds()); // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued + sender.run(time.milliseconds()); // FindCoordinator Enqueued + + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId); + sender.run(time.milliseconds()); // FindCoordinator Returned + + prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, singletonMap(tp, Errors.GROUP_AUTHORIZATION_FAILED)); + sender.run(time.milliseconds()); // TxnOffsetCommit Handled + + assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.lastError() instanceof GroupAuthorizationException); + assertTrue(sendOffsetsResult.isCompleted()); + assertFalse(sendOffsetsResult.isSuccessful()); + assertTrue(sendOffsetsResult.error() instanceof GroupAuthorizationException); + + GroupAuthorizationException exception = (GroupAuthorizationException) sendOffsetsResult.error(); + assertEquals(consumerGroupId, exception.groupId()); + + assertAbortableError(GroupAuthorizationException.class); + } + + @Test + public void testTransactionalIdAuthorizationFailureInAddOffsetsToTxn() { + final String consumerGroupId = "consumer"; + final long pid = 13131L; + final short epoch = 1; + final TopicPartition tp = new TopicPartition("foo", 0); + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction( + singletonMap(tp, new OffsetAndMetadata(39L)), consumerGroupId); + + prepareAddOffsetsToTxnResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch); + sender.run(time.milliseconds()); // AddOffsetsToTxn Handled + + assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException); + assertTrue(sendOffsetsResult.isCompleted()); + assertFalse(sendOffsetsResult.isSuccessful()); + assertTrue(sendOffsetsResult.error() instanceof TransactionalIdAuthorizationException); + + assertFatalError(TransactionalIdAuthorizationException.class); + } + + @Test + public void testTransactionalIdAuthorizationFailureInTxnOffsetCommit() { + final String consumerGroupId = "consumer"; + final long pid = 13131L; + final short epoch = 1; + final TopicPartition tp = new TopicPartition("foo", 0); + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction( + singletonMap(tp, new OffsetAndMetadata(39L)), consumerGroupId); + + prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch); + sender.run(time.milliseconds()); // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued + sender.run(time.milliseconds()); // FindCoordinator Enqueued + + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId); + sender.run(time.milliseconds()); // FindCoordinator Returned + + prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, singletonMap(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)); + sender.run(time.milliseconds()); // TxnOffsetCommit Handled + + assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException); + assertTrue(sendOffsetsResult.isCompleted()); + assertFalse(sendOffsetsResult.isSuccessful()); + assertTrue(sendOffsetsResult.error() instanceof TransactionalIdAuthorizationException); + + assertFatalError(TransactionalIdAuthorizationException.class); + } + + @Test + public void testTopicAuthorizationFailureInAddPartitions() { + final long pid = 13131L; + final short epoch = 1; + final TopicPartition tp = new TopicPartition("foo", 0); + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp); + + prepareAddPartitionsToTxn(tp, Errors.TOPIC_AUTHORIZATION_FAILED); sender.run(time.milliseconds()); - assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION)); - prepareInitPidResponse(Errors.NONE, false, pid, epoch); + assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.lastError() instanceof TopicAuthorizationException); - sender.run(time.milliseconds()); // get pid. + TopicAuthorizationException exception = (TopicAuthorizationException) transactionManager.lastError(); + assertEquals(singleton(tp.topic()), exception.unauthorizedTopics()); - assertTrue(transactionManager.hasProducerId()); + assertAbortableError(TopicAuthorizationException.class); + } + + @Test + public void testTransactionalIdAuthorizationFailureInAddPartitions() { + final long pid = 13131L; + final short epoch = 1; + final TopicPartition tp = new TopicPartition("foo", 0); + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp); + + prepareAddPartitionsToTxn(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED); + sender.run(time.milliseconds()); + + assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException); + + assertFatalError(TransactionalIdAuthorizationException.class); + } + + @Test + public void testFlushPendingPartitionsOnCommit() throws InterruptedException { + final long pid = 13131L; + final short epoch = 1; + + doInitTransactions(pid, epoch); transactionManager.beginTransaction(); transactionManager.maybeAddPartitionToTransaction(tp0); @@ -329,23 +514,11 @@ public class TransactionManagerTest { @Test public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedException { - client.setNode(brokerNode); - // This is called from the initTransactions method in the producer as the first order of business. - // It finds the coordinator and then gets a PID. final long pid = 13131L; final short epoch = 1; - transactionManager.initializeTransactions(); - prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); - - sender.run(time.milliseconds()); // find coordinator - sender.run(time.milliseconds()); - assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION)); - - prepareInitPidResponse(Errors.NONE, false, pid, epoch); - sender.run(time.milliseconds()); // get pid. + doInitTransactions(pid, epoch); - assertTrue(transactionManager.hasProducerId()); transactionManager.beginTransaction(); // User does one producer.sed transactionManager.maybeAddPartitionToTransaction(tp0); @@ -392,28 +565,16 @@ public class TransactionManagerTest { @Test(expected = ExecutionException.class) public void testProducerFencedException() throws InterruptedException, ExecutionException { - client.setNode(brokerNode); - // This is called from the initTransactions method in the producer as the first order of business. - // It finds the coordinator and then gets a PID. final long pid = 13131L; final short epoch = 1; - transactionManager.initializeTransactions(); - prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); - sender.run(time.milliseconds()); // find coordinator - sender.run(time.milliseconds()); - assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION)); - - prepareInitPidResponse(Errors.NONE, false, pid, epoch); + doInitTransactions(pid, epoch); - sender.run(time.milliseconds()); // get pid. - - assertTrue(transactionManager.hasProducerId()); transactionManager.beginTransaction(); transactionManager.maybeAddPartitionToTransaction(tp0); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future; + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; assertFalse(responseFuture.isDone()); prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); @@ -429,28 +590,16 @@ public class TransactionManagerTest { @Test public void testDisallowCommitOnProduceFailure() throws InterruptedException { - client.setNode(brokerNode); - // This is called from the initTransactions method in the producer as the first order of business. - // It finds the coordinator and then gets a PID. final long pid = 13131L; final short epoch = 1; - transactionManager.initializeTransactions(); - prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); - sender.run(time.milliseconds()); // find coordinator - sender.run(time.milliseconds()); - assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION)); - - prepareInitPidResponse(Errors.NONE, false, pid, epoch); + doInitTransactions(pid, epoch); - sender.run(time.milliseconds()); // get pid. - - assertTrue(transactionManager.hasProducerId()); transactionManager.beginTransaction(); transactionManager.maybeAddPartitionToTransaction(tp0); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future; + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; TransactionalRequestResult commitResult = transactionManager.beginCommittingTransaction(); assertFalse(responseFuture.isDone()); @@ -483,28 +632,16 @@ public class TransactionManagerTest { @Test public void testAllowAbortOnProduceFailure() throws InterruptedException { - client.setNode(brokerNode); - // This is called from the initTransactions method in the producer as the first order of business. - // It finds the coordinator and then gets a PID. final long pid = 13131L; final short epoch = 1; - transactionManager.initializeTransactions(); - prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); - - sender.run(time.milliseconds()); // find coordinator - sender.run(time.milliseconds()); - - prepareInitPidResponse(Errors.NONE, false, pid, epoch); - sender.run(time.milliseconds()); // get pid. - assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION)); - assertTrue(transactionManager.hasProducerId()); + doInitTransactions(pid, epoch); transactionManager.beginTransaction(); transactionManager.maybeAddPartitionToTransaction(tp0); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future; + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction(); assertFalse(responseFuture.isDone()); @@ -524,28 +661,16 @@ public class TransactionManagerTest { @Test public void testHandlingOfUnknownTopicPartitionErrorOnAddPartitions() throws InterruptedException { - client.setNode(brokerNode); - // This is called from the initTransactions method in the producer as the first order of business. - // It finds the coordinator and then gets a PID. final long pid = 13131L; final short epoch = 1; - transactionManager.initializeTransactions(); - prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); - sender.run(time.milliseconds()); // find coordinator - sender.run(time.milliseconds()); - assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION)); - - prepareInitPidResponse(Errors.NONE, false, pid, epoch); - - sender.run(time.milliseconds()); // get pid. + doInitTransactions(pid, epoch); - assertTrue(transactionManager.hasProducerId()); transactionManager.beginTransaction(); transactionManager.maybeAddPartitionToTransaction(tp0); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future; + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; assertFalse(responseFuture.isDone()); prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, tp0, epoch, pid); @@ -564,23 +689,11 @@ public class TransactionManagerTest { @Test public void testHandlingOfUnknownTopicPartitionErrorOnTxnOffsetCommit() throws InterruptedException { - client.setNode(brokerNode); - // This is called from the initTransactions method in the producer as the first order of business. - // It finds the coordinator and then gets a PID. final long pid = 13131L; final short epoch = 1; - transactionManager.initializeTransactions(); - prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); - sender.run(time.milliseconds()); // find coordinator - sender.run(time.milliseconds()); - assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION)); - - prepareInitPidResponse(Errors.NONE, false, pid, epoch); + doInitTransactions(pid, epoch); - sender.run(time.milliseconds()); // get pid. - - assertTrue(transactionManager.hasProducerId()); transactionManager.beginTransaction(); Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); @@ -597,13 +710,13 @@ public class TransactionManagerTest { Map<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<>(); txnOffsetCommitResponse.put(tp1, Errors.UNKNOWN_TOPIC_OR_PARTITION); - prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, consumerGroupId); + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId); prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, txnOffsetCommitResponse); - assertEquals(null, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP)); + assertEquals(null, transactionManager.coordinator(CoordinatorType.GROUP)); sender.run(time.milliseconds()); // try to send TxnOffsetCommitRequest, but find we don't have a group coordinator. sender.run(time.milliseconds()); // send find coordinator for group request - assertNotNull(transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP)); + assertNotNull(transactionManager.coordinator(CoordinatorType.GROUP)); assertTrue(transactionManager.hasPendingOffsetCommits()); sender.run(time.milliseconds()); // send TxnOffsetCommitRequest request. @@ -625,58 +738,35 @@ public class TransactionManagerTest { } private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException { - client.setNode(brokerNode); - transactionManager.initializeTransactions(); - prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); - - sender.run(time.milliseconds()); // find coordinator - sender.run(time.milliseconds()); - final long pid = 1L; final short epoch = 1; - prepareInitPidResponse(Errors.NONE, false, pid, epoch); + doInitTransactions(pid, epoch); - sender.run(time.milliseconds()); // get pid. - - assertTrue(transactionManager.hasProducerId()); transactionManager.beginTransaction(); transactionManager.maybeAddPartitionToTransaction(tp0); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; assertFalse(responseFuture.isDone()); - prepareAddPartitionsToTxnPartitionErrorResponse(tp0, error); + prepareAddPartitionsToTxn(tp0, error); sender.run(time.milliseconds()); // attempt send addPartitions. assertTrue(transactionManager.isInErrorState()); assertFalse(transactionManager.transactionContainsPartition(tp0)); } - private void prepareAddPartitionsToTxnPartitionErrorResponse(final TopicPartition tp0, final Errors error) { + private void prepareAddPartitionsToTxn(final TopicPartition tp, final Errors error) { client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { - assertTrue(body instanceof AddPartitionsToTxnRequest); - return true; - } - }, new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp0, error))); - } - - private static class MockCallback implements Callback { - private final TransactionManager transactionManager; - public MockCallback(TransactionManager transactionManager) { - this.transactionManager = transactionManager; - } - @Override - public void onCompletion(RecordMetadata metadata, Exception exception) { - if (exception != null && transactionManager != null) { - transactionManager.setError(exception); + return body instanceof AddPartitionsToTxnRequest && + ((AddPartitionsToTxnRequest) body).partitions().contains(tp); } - } + }, new AddPartitionsToTxnResponse(0, singletonMap(tp, error))); } private void prepareFindCoordinatorResponse(Errors error, boolean shouldDisconnect, - final FindCoordinatorRequest.CoordinatorType coordinatorType, + final CoordinatorType coordinatorType, final String coordinatorKey) { client.prepareResponse(new MockClient.RequestMatcher() { @Override @@ -733,7 +823,7 @@ public class TransactionManagerTest { assertEquals(transactionalId, addPartitionsToTxnRequest.transactionalId()); return true; } - }, new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, error))); + }, new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, error))); } private void prepareEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) { @@ -782,9 +872,54 @@ public class TransactionManagerTest { private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) { ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP); - Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp); + Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = singletonMap(tp, resp); return new ProduceResponse(partResp, throttleTimeMs); } + private void doInitTransactions(long pid, short epoch) { + transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); + sender.run(time.milliseconds()); // find coordinator + sender.run(time.milliseconds()); + assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + + prepareInitPidResponse(Errors.NONE, false, pid, epoch); + sender.run(time.milliseconds()); // get pid. + assertTrue(transactionManager.hasProducerId()); + } + private void assertAbortableError(Class<? extends RuntimeException> cause) { + try { + transactionManager.beginTransaction(); + fail("Should have raised " + cause.getSimpleName()); + } catch (KafkaException e) { + assertTrue(cause.isAssignableFrom(e.getCause().getClass())); + assertTrue(transactionManager.isInErrorState()); + } + + assertTrue(transactionManager.isInErrorState()); + transactionManager.beginAbortingTransaction(); + assertFalse(transactionManager.isInErrorState()); + } + + private void assertFatalError(Class<? extends RuntimeException> cause) { + assertTrue(transactionManager.isInErrorState()); + + try { + transactionManager.beginAbortingTransaction(); + fail("Should have raised " + cause.getSimpleName()); + } catch (KafkaException e) { + assertTrue(cause.isAssignableFrom(e.getCause().getClass())); + assertTrue(transactionManager.isInErrorState()); + } + + // Transaction abort cannot clear fatal error state + try { + transactionManager.beginAbortingTransaction(); + fail("Should have raised " + cause.getSimpleName()); + } catch (KafkaException e) { + assertTrue(cause.isAssignableFrom(e.getCause().getClass())); + assertTrue(transactionManager.isInErrorState()); + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 9142c90..2e9a688 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -115,10 +115,10 @@ public class RequestResponseTest { checkErrorResponse(createListOffsetRequest(2), new UnknownServerException()); checkResponse(createListOffsetResponse(2), 2); checkRequest(MetadataRequest.Builder.allTopics().build((short) 2)); - checkRequest(createMetadataRequest(1, asList("topic1"))); - checkErrorResponse(createMetadataRequest(1, asList("topic1")), new UnknownServerException()); + checkRequest(createMetadataRequest(1, singletonList("topic1"))); + checkErrorResponse(createMetadataRequest(1, singletonList("topic1")), new UnknownServerException()); checkResponse(createMetadataResponse(), 2); - checkErrorResponse(createMetadataRequest(2, asList("topic1")), new UnknownServerException()); + checkErrorResponse(createMetadataRequest(2, singletonList("topic1")), new UnknownServerException()); checkRequest(createOffsetCommitRequest(2)); checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException()); checkResponse(createOffsetCommitResponse(), 0); @@ -183,7 +183,7 @@ public class RequestResponseTest { checkOlderFetchVersions(); checkResponse(createMetadataResponse(), 0); checkResponse(createMetadataResponse(), 1); - checkErrorResponse(createMetadataRequest(1, asList("topic1")), new UnknownServerException()); + checkErrorResponse(createMetadataRequest(1, singletonList("topic1")), new UnknownServerException()); checkRequest(createOffsetCommitRequest(0)); checkErrorResponse(createOffsetCommitRequest(0), new UnknownServerException()); checkRequest(createOffsetCommitRequest(1)); @@ -984,7 +984,7 @@ public class RequestResponseTest { final Map<TopicPartition, TxnOffsetCommitRequest.CommittedOffset> offsets = new HashMap<>(); offsets.put(new TopicPartition("topic", 73), new TxnOffsetCommitRequest.CommittedOffset(100, null)); - return new TxnOffsetCommitRequest.Builder("gid", 21L, (short) 42, offsets).build(); + return new TxnOffsetCommitRequest.Builder("transactionalId", "groupId", 21L, (short) 42, offsets).build(); } private TxnOffsetCommitResponse createTxnOffsetCommitResponse() { http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/main/scala/kafka/admin/AclCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 925c407..e02b5dc 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -34,7 +34,8 @@ object AclCommand { Broker -> Set(DescribeConfigs), Topic -> Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, All), Group -> Set(Read, Describe, All), - Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, All) + Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, All), + TransactionalId -> Set(Describe, Write, All) ) def main(args: Array[String]) { @@ -88,7 +89,7 @@ object AclCommand { CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.") for ((resource, acls) <- resourceToAcl) { - println(s"Adding ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") + println(s"Adding ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") authorizer.addAcls(acls, resource) } @@ -102,10 +103,10 @@ object AclCommand { for ((resource, acls) <- resourceToAcl) { if (acls.isEmpty) { - if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource `${resource}`? (y/n)")) + if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource `$resource`? (y/n)")) authorizer.removeAcls(resource) } else { - if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource `${resource}`? (y/n)")) + if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource `$resource`? (y/n)")) authorizer.removeAcls(acls, resource) } } @@ -123,7 +124,7 @@ object AclCommand { else resources.map(resource => resource -> authorizer.getAcls(resource)) for ((resource, acls) <- resourceToAcls) - println(s"Current ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") + println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") } } @@ -149,12 +150,16 @@ object AclCommand { private def getProducerResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = { val topics: Set[Resource] = getResource(opts).filter(_.resourceType == Topic) + val transactionalIds: Set[Resource] = getResource(opts).filter(_.resourceType == TransactionalId) + val enableIdempotence = opts.options.has(opts.idempotentOpt) val acls = getAcl(opts, Set(Write, Describe)) - //Write, Describe permission on topics, Create permission on cluster - topics.map(_ -> acls).toMap[Resource, Set[Acl]] + - (Resource.ClusterResource -> getAcl(opts, Set(Create))) + //Write, Describe permission on topics, Create permission on cluster, Write, Describe on transactionalIds + topics.map(_ -> acls).toMap[Resource, Set[Acl]] ++ + transactionalIds.map(_ -> acls).toMap[Resource, Set[Acl]] + + (Resource.ClusterResource -> (getAcl(opts, Set(Create)) ++ + (if (enableIdempotence) getAcl(opts, Set(IdempotentWrite)) else Set.empty[Acl]))) } private def getConsumerResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = { @@ -184,7 +189,7 @@ object AclCommand { val allowedHosts = getHosts(opts, opts.allowHostsOpt, opts.allowPrincipalsOpt) - val deniedHosts = getHosts(opts, opts.denyHostssOpt, opts.denyPrincipalsOpt) + val deniedHosts = getHosts(opts, opts.denyHostsOpt, opts.denyPrincipalsOpt) val acls = new collection.mutable.HashSet[Acl] if (allowedHosts.nonEmpty && allowedPrincipals.nonEmpty) @@ -232,7 +237,7 @@ object AclCommand { if (opts.options.has(opts.topicOpt)) opts.options.valuesOf(opts.topicOpt).asScala.foreach(topic => resources += new Resource(Topic, topic.trim)) - if (opts.options.has(opts.clusterOpt)) + if (opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt)) resources += Resource.ClusterResource if (opts.options.has(opts.groupOpt)) @@ -241,6 +246,10 @@ object AclCommand { if (opts.options.has(opts.brokerOpt)) opts.options.valuesOf(opts.brokerOpt).asScala.foreach(broker => resources += new Resource(Broker, broker.toString)) + if (opts.options.has(opts.transactionalIdOpt)) + opts.options.valuesOf(opts.transactionalIdOpt).asScala.foreach(transactionalId => + resources += new Resource(TransactionalId, transactionalId)) + if (resources.isEmpty && dieIfNoResourceFound) CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group>") @@ -295,6 +304,16 @@ object AclCommand { .describedAs("broker") .ofType(classOf[Int]) + val transactionalIdOpt = parser.accepts("transactional-id", "The transactionalId to which ACLs should " + + "be added or removed. A value of * indicates the ACLs should apply to all transactionalIds.") + .withRequiredArg + .describedAs("transactional-id") + .ofType(classOf[String]) + + val idempotentOpt = parser.accepts("idempotent", "Enable idempotence for the producer. This should be " + + "used in combination with the --producer option. Note that idempotence is enabled automatically if " + + "the producer is authorized to a particular transactional-id.") + val addOpt = parser.accepts("add", "Indicates you are trying to add ACLs.") val removeOpt = parser.accepts("remove", "Indicates you are trying to remove ACLs.") val listOpt = parser.accepts("list", "List ACLs for the specified resource, use --topic <topic> or --group <group> or --cluster to specify a resource.") @@ -329,7 +348,7 @@ object AclCommand { .describedAs("allow-host") .ofType(classOf[String]) - val denyHostssOpt = parser.accepts("deny-host", "Host from which principals listed in --deny-principal will be denied access. " + + val denyHostsOpt = parser.accepts("deny-host", "Host from which principals listed in --deny-principal will be denied access. " + "If you have specified --deny-principal then the default for this option will be set to * which denies access from all hosts.") .withRequiredArg .describedAs("deny-host") @@ -354,17 +373,20 @@ object AclCommand { if (actions != 1) CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --list, --add, --remove. ") - CommandLineUtils.checkInvalidArgs(parser, options, listOpt, Set(producerOpt, consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostssOpt, denyPrincipalsOpt)) + CommandLineUtils.checkInvalidArgs(parser, options, listOpt, Set(producerOpt, consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostsOpt, denyPrincipalsOpt)) //when --producer or --consumer is specified , user should not specify operations as they are inferred and we also disallow --deny-principals and --deny-hosts. - CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostssOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostssOpt)) + CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostsOpt)) + CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostsOpt)) if (options.has(producerOpt) && !options.has(topicOpt)) CommandLineUtils.printUsageAndDie(parser, "With --producer you must specify a --topic") - if (options.has(consumerOpt) && (!options.has(topicOpt) || !options.has(groupOpt) || (!options.has(producerOpt) && options.has(clusterOpt)))) - CommandLineUtils.printUsageAndDie(parser, "With --consumer you must specify a --topic and a --group and no --cluster option should be specified.") + if (options.has(idempotentOpt) && !options.has(producerOpt)) + CommandLineUtils.printUsageAndDie(parser, "The --idempotent option is only available if --producer is set") + + if (options.has(consumerOpt) && (!options.has(topicOpt) || !options.has(groupOpt) || (!options.has(producerOpt) && (options.has(clusterOpt) || options.has(transactionalIdOpt))))) + CommandLineUtils.printUsageAndDie(parser, "With --consumer you must specify a --topic and a --group and no --cluster or --transactional-id option should be specified.") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 302fcb5..7bde4e2 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -323,6 +323,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState } def prepareTxnOffsetCommit(producerId: Long, offsets: Map[TopicPartition, OffsetAndMetadata]) { + trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $offsets is pending") receivedTransactionalOffsetCommits = true val producerOffsets = pendingTransactionalOffsetCommits.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) @@ -339,10 +340,12 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState /* Remove a pending transactional offset commit if the actual offset commit record was not written to the log. * We will return an error and the client will retry the request, potentially to a different coordinator. */ - def failPendingTxnOffsetCommit(producerId: Long, topicPartition: TopicPartition, offsetAndMetadata: OffsetAndMetadata): Unit = { + def failPendingTxnOffsetCommit(producerId: Long, topicPartition: TopicPartition): Unit = { pendingTransactionalOffsetCommits.get(producerId) match { case Some(pendingOffsets) => - pendingOffsets.remove(topicPartition) + val pendingOffsetCommit = pendingOffsets.remove(topicPartition) + trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $pendingOffsetCommit failed " + + s"to be appended to the log") if (pendingOffsets.isEmpty) pendingTransactionalOffsetCommits.remove(producerId) case _ => @@ -366,18 +369,28 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState * to the log. */ def completePendingTxnOffsetCommit(producerId: Long, isCommit: Boolean): Unit = { - trace(s"Completing transactional offset commit for producer $producerId and group $groupId. isCommit: $isCommit") + val pendingOffsetsOpt = pendingTransactionalOffsetCommits.remove(producerId) if (isCommit) { - val producerOffsets = pendingTransactionalOffsetCommits.getOrElse(producerId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) - producerOffsets.foreach { case (topicPartition, commitRecordMetadataAndOffset) => - if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty) - throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " + - s"and groupId $groupId even though the the offset commit record itself hasn't been appended to the log.") - if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(commitRecordMetadataAndOffset)) - offsets.put(topicPartition, commitRecordMetadataAndOffset) + pendingOffsetsOpt.foreach { pendingOffsets => + pendingOffsets.foreach { case (topicPartition, commitRecordMetadataAndOffset) => + if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty) + throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " + + s"and groupId $groupId even though the the offset commit record itself hasn't been appended to the log.") + + val currentOffsetOpt = offsets.get(topicPartition) + if (currentOffsetOpt.forall(_.olderThan(commitRecordMetadataAndOffset))) { + trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offset $commitRecordMetadataAndOffset " + + "committed and loaded into the cache.") + offsets.put(topicPartition, commitRecordMetadataAndOffset) + } else { + trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offset $commitRecordMetadataAndOffset " + + s"committed, but not loaded since its offset is older than current offset $currentOffsetOpt.") + } + } } + } else { + trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $pendingOffsetsOpt aborted") } - pendingTransactionalOffsetCommits.remove(producerId) } def activeProducers = pendingTransactionalOffsetCommits.keySet @@ -430,7 +443,13 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState } override def toString: String = { - "[%s,%s,%s,%s]".format(groupId, protocolType, currentState.toString, members) + "GroupMetadata(" + + s"groupId=$groupId, " + + s"generation=$generationId, " + + s"protocolType=$protocolType, " + + s"currentState=$currentState, " + + s"members=$members)" } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index a7eb28b..8e5135d 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -324,7 +324,7 @@ class GroupMetadataManager(brokerId: Int, removeProducerGroup(producerId, group.groupId) filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) => if (isTxnOffsetCommit) - group.failPendingTxnOffsetCommit(producerId, topicPartition, offsetAndMetadata) + group.failPendingTxnOffsetCommit(producerId, topicPartition) else group.failPendingOffsetWrite(topicPartition, offsetAndMetadata) } @@ -536,7 +536,6 @@ class GroupMetadataManager(brokerId: Int, val groupId = groupMetadataKey.key val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value) if (groupMetadata != null) { - trace(s"Loaded group metadata for group $groupId with generation ${groupMetadata.generationId}") removedGroups.remove(groupId) loadedGroups.put(groupId, groupMetadata) } else { @@ -577,6 +576,7 @@ class GroupMetadataManager(brokerId: Int, loadedGroups.values.foreach { group => val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) + trace(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets") loadGroup(group, offsets, pendingOffsets) onGroupLoaded(group) } @@ -587,6 +587,7 @@ class GroupMetadataManager(brokerId: Int, val group = new GroupMetadata(groupId) val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) + trace(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets") loadGroup(group, offsets, pendingOffsets) onGroupLoaded(group) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala index ce542b1..b082b9b 100644 --- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala @@ -85,7 +85,7 @@ private[group] class MemberMetadata(val memberId: String, if (protocols.size != this.supportedProtocols.size) return false - for (i <- 0 until protocols.size) { + for (i <- protocols.indices) { val p1 = protocols(i) val p2 = supportedProtocols(i) if (p1._1 != p2._1 || !util.Arrays.equals(p1._2, p2._2)) @@ -114,7 +114,15 @@ private[group] class MemberMetadata(val memberId: String, } } - override def toString = { - "[%s,%s,%s,%d]".format(memberId, clientId, clientHost, sessionTimeoutMs) + override def toString: String = { + "MemberMetadata(" + + s"memberId=$memberId, " + + s"clientId=$clientId, " + + s"clientHost=$clientHost, " + + s"sessionTimeoutMs=$sessionTimeoutMs, " + + s"rebalanceTimeoutMs=$rebalanceTimeoutMs, " + + s"supportedProtocols=${supportedProtocols.map(_._1)}, " + + ")" } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/main/scala/kafka/security/auth/Operation.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/Operation.scala b/core/src/main/scala/kafka/security/auth/Operation.scala index f65d9f0..420c3eb 100644 --- a/core/src/main/scala/kafka/security/auth/Operation.scala +++ b/core/src/main/scala/kafka/security/auth/Operation.scala @@ -65,6 +65,10 @@ case object AlterConfigs extends Operation { val name = "AlterConfigs" val toJava = AclOperation.ALTER_CONFIGS } +case object IdempotentWrite extends Operation { + val name = "IdempotentWrite" + val toJava = AclOperation.IDEMPOTENT_WRITE +} case object All extends Operation { val name = "All" val toJava = AclOperation.ALL @@ -86,5 +90,5 @@ object Operation { } def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, Describe, ClusterAction, AlterConfigs, - DescribeConfigs, All) + DescribeConfigs, IdempotentWrite, All) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/main/scala/kafka/security/auth/Resource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala index a0ed9f9..311f5b5 100644 --- a/core/src/main/scala/kafka/security/auth/Resource.scala +++ b/core/src/main/scala/kafka/security/auth/Resource.scala @@ -21,7 +21,6 @@ object Resource { val ClusterResourceName = "kafka-cluster" val ClusterResource = new Resource(Cluster, Resource.ClusterResourceName) val ProducerIdResourceName = "producer-id" - val ProducerIdResource = new Resource(Cluster, Resource.ProducerIdResourceName) val WildCardResource = "*" def fromString(str: String): Resource = { @@ -38,7 +37,7 @@ object Resource { * @param name name of the resource, for topic this will be topic name , for group it will be group name. For cluster type * it will be a constant string kafka-cluster. */ -case class Resource(val resourceType: ResourceType, val name: String) { +case class Resource(resourceType: ResourceType, name: String) { override def toString: String = { resourceType.name + Resource.Separator + name http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/main/scala/kafka/security/auth/ResourceType.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala index ea7ce3c..9cfe1cd 100644 --- a/core/src/main/scala/kafka/security/auth/ResourceType.scala +++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala @@ -19,11 +19,6 @@ package kafka.security.auth import kafka.common.{BaseEnum, KafkaException} import org.apache.kafka.common.protocol.Errors -/** - * ResourceTypes. - */ - - sealed trait ResourceType extends BaseEnum { def error: Errors } case object Cluster extends ResourceType { @@ -46,16 +41,11 @@ case object Group extends ResourceType { val error = Errors.GROUP_AUTHORIZATION_FAILED } -case object ProducerTransactionalId extends ResourceType { - val name = "ProducerTransactionalId" +case object TransactionalId extends ResourceType { + val name = "TransactionalId" val error = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED } -case object ProducerIdResource extends ResourceType { - val name = "ProducerIdResource" - val error = Errors.PRODUCER_ID_AUTHORIZATION_FAILED -} - object ResourceType { def fromString(resourceType: String): ResourceType = { @@ -63,5 +53,5 @@ object ResourceType { rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(","))) } - def values: Seq[ResourceType] = List(Cluster, Topic, Group, ProducerTransactionalId, ProducerIdResource, Broker) + def values: Seq[ResourceType] = List(Cluster, Topic, Group, TransactionalId, Broker) }
