Repository: kafka Updated Branches: refs/heads/trunk c26545ea5 -> 018679410
KAFKA-5248; Remove unused/unneeded retention time in TxnOffsetCommitRequest Author: Jason Gustafson <[email protected]> Reviewers: Guozhang Wang <[email protected]>, Apurva Mehta <[email protected]>, Ismael Juma <[email protected]> Closes #3058 from hachikuji/KAFKA-5248 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/01867941 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/01867941 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/01867941 Branch: refs/heads/trunk Commit: 0186794104b7106fe426024d65730fec79ad999a Parents: c26545e Author: Jason Gustafson <[email protected]> Authored: Mon May 15 12:58:36 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Mon May 15 12:58:50 2017 -0700 ---------------------------------------------------------------------- .../producer/internals/TransactionManager.java | 3 +-- .../org/apache/kafka/common/protocol/Protocol.java | 3 --- .../common/requests/TxnOffsetCommitRequest.java | 17 +++-------------- .../kafka/common/requests/RequestResponseTest.java | 2 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +- 5 files changed, 6 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/01867941/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 7e2f813..f3ed252 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -37,7 +37,6 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.InitProducerIdResponse; -import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.requests.TxnOffsetCommitRequest; import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset; @@ -461,7 +460,7 @@ public class TransactionManager { pendingTxnOffsetCommits.put(entry.getKey(), committedOffset); } TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(consumerGroupId, - producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, OffsetCommitRequest.DEFAULT_RETENTION_TIME, + producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, pendingTxnOffsetCommits); return new TxnOffsetCommitHandler(result, builder); } http://git-wip-us.apache.org/repos/asf/kafka/blob/01867941/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index fb3c8c9..5e05738 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -1521,9 +1521,6 @@ public class Protocol { new Field("producer_epoch", INT16, "Current epoch associated with the producer id."), - new Field("retention_time", - INT64, - "The time in ms to retain the offset."), new Field("topics", new ArrayOf(new Schema( new Field("topic", STRING), http://git-wip-us.apache.org/repos/asf/kafka/blob/01867941/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java index 3f3024f..f5334f2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java @@ -30,7 +30,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest { private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id"; private static final String PRODUCER_ID_KEY_NAME = "producer_id"; private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch"; - private static final String RETENTION_TIME_KEY_NAME = "retention_time"; private static final String TOPIC_PARTITIONS_KEY_NAME = "topics"; private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partitions"; @@ -42,16 +41,14 @@ public class TxnOffsetCommitRequest extends AbstractRequest { private final String consumerGroupId; private final long producerId; private final short producerEpoch; - private final long retentionTimeMs; private final Map<TopicPartition, CommittedOffset> offsets; - public Builder(String consumerGroupId, long producerId, short producerEpoch, long retentionTimeMs, + public Builder(String consumerGroupId, long producerId, short producerEpoch, Map<TopicPartition, CommittedOffset> offsets) { super(ApiKeys.TXN_OFFSET_COMMIT); this.consumerGroupId = consumerGroupId; this.producerId = producerId; this.producerEpoch = producerEpoch; - this.retentionTimeMs = retentionTimeMs; this.offsets = offsets; } @@ -61,23 +58,21 @@ public class TxnOffsetCommitRequest extends AbstractRequest { @Override public TxnOffsetCommitRequest build(short version) { - return new TxnOffsetCommitRequest(version, consumerGroupId, producerId, producerEpoch, retentionTimeMs, offsets); + return new TxnOffsetCommitRequest(version, consumerGroupId, producerId, producerEpoch, offsets); } } private final String consumerGroupId; private final long producerId; private final short producerEpoch; - private final long retentionTimeMs; private final Map<TopicPartition, CommittedOffset> offsets; public TxnOffsetCommitRequest(short version, String consumerGroupId, long producerId, short producerEpoch, - long retentionTimeMs, Map<TopicPartition, CommittedOffset> offsets) { + Map<TopicPartition, CommittedOffset> offsets) { super(version); this.consumerGroupId = consumerGroupId; this.producerId = producerId; this.producerEpoch = producerEpoch; - this.retentionTimeMs = retentionTimeMs; this.offsets = offsets; } @@ -86,7 +81,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest { this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME); this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME); this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME); - this.retentionTimeMs = struct.getLong(RETENTION_TIME_KEY_NAME); Map<TopicPartition, CommittedOffset> offsets = new HashMap<>(); Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME); @@ -116,10 +110,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest { return producerEpoch; } - public long retentionTimeMs() { - return retentionTimeMs; - } - public Map<TopicPartition, CommittedOffset> offsets() { return offsets; } @@ -130,7 +120,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest { struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId); struct.set(PRODUCER_ID_KEY_NAME, producerId); struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch); - struct.set(RETENTION_TIME_KEY_NAME, retentionTimeMs); Map<String, Map<Integer, CommittedOffset>> mappedPartitionOffsets = CollectionUtils.groupDataByTopic(offsets); Object[] partitionsArray = new Object[mappedPartitionOffsets.size()]; http://git-wip-us.apache.org/repos/asf/kafka/blob/01867941/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index cbfb6a9..1cfd6a3 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -951,7 +951,7 @@ public class RequestResponseTest { final Map<TopicPartition, TxnOffsetCommitRequest.CommittedOffset> offsets = new HashMap<>(); offsets.put(new TopicPartition("topic", 73), new TxnOffsetCommitRequest.CommittedOffset(100, null)); - return new TxnOffsetCommitRequest.Builder("gid", 21L, (short) 42, 73, offsets).build(); + return new TxnOffsetCommitRequest.Builder("gid", 21L, (short) 42, offsets).build(); } private TxnOffsetCommitResponse createTxnOffsetCommitResponse() { http://git-wip-us.apache.org/repos/asf/kafka/blob/01867941/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 425b9f1..4cf3e7d 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -270,7 +270,7 @@ class RequestQuotaTest extends BaseRequestTest { new WriteTxnMarkersRequest.Builder(List.empty.asJava) case ApiKeys.TXN_OFFSET_COMMIT => - new TxnOffsetCommitRequest.Builder("test-txn-group", 2, 0, 3600, Map.empty.asJava) + new TxnOffsetCommitRequest.Builder("test-txn-group", 2, 0, Map.empty.asJava) case key => throw new IllegalArgumentException("Unsupported API key " + apiKey)
