http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index e38e583..0a0f3d9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -44,6 +44,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.record.EndTransactionMarker; import org.apache.kafka.common.record.LegacyRecord; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; @@ -181,16 +182,18 @@ public class FetcherTest { assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); + long producerId = 1; + short epoch = 0; + int baseSequence = 0; + ByteBuffer buffer = ByteBuffer.allocate(1024); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); - builder.append(0L, "key".getBytes(), null); - builder.appendControlRecord(0L, ControlRecordType.COMMIT, null); + MemoryRecordsBuilder builder = MemoryRecords.idempotentBuilder(buffer, CompressionType.NONE, 0L, producerId, + epoch, baseSequence); builder.append(0L, "key".getBytes(), null); builder.close(); - builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 3L); - builder.appendControlRecord(0L, ControlRecordType.ABORT, null); - builder.close(); + MemoryRecords.writeEndTransactionalMarker(buffer, 1L, producerId, epoch, new EndTransactionMarker(ControlRecordType.ABORT, 0) + ); buffer.flip(); @@ -202,10 +205,11 @@ public class FetcherTest { assertTrue(partitionRecords.containsKey(tp1)); List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp1); - assertEquals(2, records.size()); - assertEquals(4L, subscriptions.position(tp1).longValue()); - for (ConsumerRecord<byte[], byte[]> record : records) - assertArrayEquals("key".getBytes(), record.key()); + assertEquals(1, records.size()); + assertEquals(2L, subscriptions.position(tp1).longValue()); + + ConsumerRecord<byte[], byte[]> record = records.get(0); + assertArrayEquals("key".getBytes(), record.key()); } @Test @@ -814,6 +818,29 @@ public class FetcherTest { } @Test + public void testListOffsetsSendsIsolationLevel() { + for (final IsolationLevel isolationLevel : IsolationLevel.values()) { + Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(), + new ByteArrayDeserializer(), Integer.MAX_VALUE, isolationLevel); + + subscriptions.assignFromUser(singleton(tp1)); + subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST); + + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + ListOffsetRequest request = (ListOffsetRequest) body; + return request.isolationLevel() == isolationLevel; + } + }, listOffsetResponse(Errors.NONE, 1L, 5L)); + fetcher.updateFetchPositions(singleton(tp1)); + assertFalse(subscriptions.isOffsetResetNeeded(tp1)); + assertTrue(subscriptions.isFetchable(tp1)); + assertEquals(5, subscriptions.position(tp1).longValue()); + } + } + + @Test public void testUpdateFetchPositionResetToEarliestOffset() { subscriptions.assignFromUser(singleton(tp1)); subscriptions.needOffsetReset(tp1, OffsetResetStrategy.EARLIEST); @@ -1206,7 +1233,7 @@ public class FetcherTest { new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes())); - currentOffset += abortTransaction(buffer, 1L, currentOffset, time.milliseconds()); + currentOffset += abortTransaction(buffer, 1L, currentOffset); buffer.flip(); @@ -1240,7 +1267,7 @@ public class FetcherTest { new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes())); - currentOffset += commitTransaction(buffer, 1L, currentOffset, time.milliseconds()); + currentOffset += commitTransaction(buffer, 1L, currentOffset); buffer.flip(); List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>(); @@ -1278,7 +1305,7 @@ public class FetcherTest { List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>(); int currOffset = 0; - // Appends for producer 1 (evetually committed) + // Appends for producer 1 (eventually committed) currOffset += appendTransactionalRecords(buffer, 1L, currOffset, new SimpleRecord(time.milliseconds(), "commit1-1".getBytes(), "value".getBytes()), new SimpleRecord(time.milliseconds(), "commit1-2".getBytes(), "value".getBytes())); @@ -1288,13 +1315,13 @@ public class FetcherTest { new SimpleRecord(time.milliseconds(), "abort2-1".getBytes(), "value".getBytes())); // commit producer 1 - currOffset += commitTransaction(buffer, 1L, currOffset, time.milliseconds()); + currOffset += commitTransaction(buffer, 1L, currOffset); // append more for producer 2 (eventually aborted) currOffset += appendTransactionalRecords(buffer, 2L, currOffset, new SimpleRecord(time.milliseconds(), "abort2-2".getBytes(), "value".getBytes())); // abort producer 2 - currOffset += abortTransaction(buffer, 2L, currOffset, time.milliseconds()); + currOffset += abortTransaction(buffer, 2L, currOffset); abortedTransactions.add(new FetchResponse.AbortedTransaction(2, 2)); // New transaction for producer 1 (eventually aborted) @@ -1310,11 +1337,11 @@ public class FetcherTest { new SimpleRecord(time.milliseconds(), "abort1-2".getBytes(), "value".getBytes())); // abort producer 1 - currOffset += abortTransaction(buffer, 1L, currOffset, time.milliseconds()); + currOffset += abortTransaction(buffer, 1L, currOffset); abortedTransactions.add(new FetchResponse.AbortedTransaction(1, 6)); // commit producer 2 - currOffset += commitTransaction(buffer, 2L, currOffset, time.milliseconds()); + currOffset += commitTransaction(buffer, 2L, currOffset); buffer.flip(); @@ -1335,12 +1362,11 @@ public class FetcherTest { assertTrue(fetchedRecords.containsKey(tp1)); // There are only 3 committed records List<ConsumerRecord<byte[], byte[]>> fetchedConsumerRecords = fetchedRecords.get(tp1); - Set<String> committedKeys = new HashSet<>(Arrays.asList("commit1-1", "commit1-2", "commit2-1")); - Set<String> actuallyCommittedKeys = new HashSet<>(); + Set<String> fetchedKeys = new HashSet<>(); for (ConsumerRecord<byte[], byte[]> consumerRecord : fetchedConsumerRecords) { - actuallyCommittedKeys.add(new String(consumerRecord.key(), StandardCharsets.UTF_8)); + fetchedKeys.add(new String(consumerRecord.key(), StandardCharsets.UTF_8)); } - assertTrue(actuallyCommittedKeys.equals(committedKeys)); + assertEquals(Utils.mkSet("commit1-1", "commit1-2", "commit2-1"), fetchedKeys); } @Test @@ -1354,14 +1380,14 @@ public class FetcherTest { new SimpleRecord(time.milliseconds(), "abort1-1".getBytes(), "value".getBytes()), new SimpleRecord(time.milliseconds(), "abort1-2".getBytes(), "value".getBytes())); - currentOffset += abortTransaction(buffer, 1L, currentOffset, time.milliseconds()); + currentOffset += abortTransaction(buffer, 1L, currentOffset); // Duplicate abort -- should be ignored. - currentOffset += abortTransaction(buffer, 1L, currentOffset, time.milliseconds()); + currentOffset += abortTransaction(buffer, 1L, currentOffset); // Now commit a transaction. currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset, new SimpleRecord(time.milliseconds(), "commit1-1".getBytes(), "value".getBytes()), new SimpleRecord(time.milliseconds(), "commit1-2".getBytes(), "value".getBytes())); - currentOffset += commitTransaction(buffer, 1L, currentOffset, time.milliseconds()); + currentOffset += commitTransaction(buffer, 1L, currentOffset); buffer.flip(); List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>(); @@ -1402,7 +1428,7 @@ public class FetcherTest { new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes())); - currentOffset += abortTransaction(buffer, 1L, currentOffset, time.milliseconds()); + currentOffset += abortTransaction(buffer, 1L, currentOffset); buffer.flip(); @@ -1436,7 +1462,7 @@ public class FetcherTest { new SimpleRecord(time.milliseconds(), "abort1-1".getBytes(), "value".getBytes()), new SimpleRecord(time.milliseconds(), "abort1-2".getBytes(), "value".getBytes())); - currentOffset += abortTransaction(buffer, 1L, currentOffset, time.milliseconds()); + currentOffset += abortTransaction(buffer, 1L, currentOffset); buffer.flip(); List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>(); @@ -1463,7 +1489,8 @@ public class FetcherTest { private int appendTransactionalRecords(ByteBuffer buffer, long pid, long baseOffset, SimpleRecord... records) { MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds(), pid, (short) 0, (int) baseOffset, true, RecordBatch.NO_PARTITION_LEADER_EPOCH); + TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds(), pid, (short) 0, (int) baseOffset, true, + RecordBatch.NO_PARTITION_LEADER_EPOCH); for (SimpleRecord record : records) { builder.append(record); @@ -1472,19 +1499,15 @@ public class FetcherTest { return records.length; } - private int commitTransaction(ByteBuffer buffer, long pid, int baseOffset, long timestamp) { - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds(), pid, (short) 0, baseOffset, true, RecordBatch.NO_PARTITION_LEADER_EPOCH); - builder.appendControlRecord(timestamp, ControlRecordType.COMMIT, null); - builder.build(); + private int commitTransaction(ByteBuffer buffer, long producerId, int baseOffset) { + MemoryRecords.writeEndTransactionalMarker(buffer, baseOffset, producerId, (short) 0, + new EndTransactionMarker(ControlRecordType.COMMIT, 0)); return 1; } - private int abortTransaction(ByteBuffer buffer, long pid, long baseOffset, long timestamp) { - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds(), pid, (short) 0, (int) baseOffset, true, RecordBatch.NO_PARTITION_LEADER_EPOCH); - builder.appendControlRecord(timestamp, ControlRecordType.ABORT, null); - builder.build(); + private int abortTransaction(ByteBuffer buffer, long producerId, long baseOffset) { + MemoryRecords.writeEndTransactionalMarker(buffer, baseOffset, producerId, (short) 0, + new EndTransactionMarker(ControlRecordType.ABORT, 0)); return 1; }
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java index 57f4663..ec858aa 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java @@ -224,25 +224,32 @@ public class DefaultRecordBatchTest { } @Test - public void testReadAndWriteControlRecord() { + public void testReadAndWriteControlBatch() { + long producerId = 1L; + short producerEpoch = 0; + int coordinatorEpoch = 15; + ByteBuffer buffer = ByteBuffer.allocate(128); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, - TimestampType.CREATE_TIME, 0L); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, + CompressionType.NONE, TimestampType.CREATE_TIME, 0L, RecordBatch.NO_TIMESTAMP, producerId, + producerEpoch, RecordBatch.NO_SEQUENCE, true, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, + buffer.remaining()); - builder.appendControlRecord(System.currentTimeMillis(), ControlRecordType.COMMIT, null); - builder.appendControlRecord(System.currentTimeMillis(), ControlRecordType.ABORT, null); + EndTransactionMarker marker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch); + builder.appendEndTxnMarker(System.currentTimeMillis(), marker); MemoryRecords records = builder.build(); + List<MutableRecordBatch> batches = TestUtils.toList(records.batches()); + assertEquals(1, batches.size()); + + MutableRecordBatch batch = batches.get(0); + assertTrue(batch.isControlBatch()); + List<Record> logRecords = TestUtils.toList(records.records()); - assertEquals(2, logRecords.size()); + assertEquals(1, logRecords.size()); Record commitRecord = logRecords.get(0); - assertTrue(commitRecord.isControlRecord()); - assertEquals(ControlRecordType.COMMIT, ControlRecordType.parse(commitRecord.key())); - - Record abortRecord = logRecords.get(1); - assertTrue(abortRecord.isControlRecord()); - assertEquals(ControlRecordType.ABORT, ControlRecordType.parse(abortRecord.key())); + assertEquals(marker, EndTransactionMarker.deserialize(commitRecord)); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java index 251db15..61b7b00 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.header.internals.RecordHeader; import org.junit.Test; import java.nio.ByteBuffer; -import java.util.Arrays; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -45,31 +44,27 @@ public class DefaultRecordTest { new SimpleRecord(15L, "hi".getBytes(), "there".getBytes(), headers) }; - for (boolean isControlRecord : Arrays.asList(true, false)) { - for (SimpleRecord record : records) { - int baseSequence = 723; - long baseOffset = 37; - int offsetDelta = 10; - long baseTimestamp = System.currentTimeMillis(); - long timestampDelta = 323; + for (SimpleRecord record : records) { + int baseSequence = 723; + long baseOffset = 37; + int offsetDelta = 10; + long baseTimestamp = System.currentTimeMillis(); + long timestampDelta = 323; - ByteBuffer buffer = ByteBuffer.allocate(1024); - DefaultRecord.writeTo(buffer, isControlRecord, offsetDelta, timestampDelta, record.key(), - record.value(), record.headers()); - buffer.flip(); + ByteBuffer buffer = ByteBuffer.allocate(1024); + DefaultRecord.writeTo(buffer, offsetDelta, timestampDelta, record.key(), record.value(), record.headers()); + buffer.flip(); - DefaultRecord logRecord = DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, null); - assertNotNull(logRecord); - assertEquals(baseOffset + offsetDelta, logRecord.offset()); - assertEquals(baseSequence + offsetDelta, logRecord.sequence()); - assertEquals(baseTimestamp + timestampDelta, logRecord.timestamp()); - assertEquals(record.key(), logRecord.key()); - assertEquals(record.value(), logRecord.value()); - assertEquals(isControlRecord, logRecord.isControlRecord()); - assertArrayEquals(record.headers(), logRecord.headers()); - assertEquals(DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, record.key(), record.value(), - record.headers()), logRecord.sizeInBytes()); - } + DefaultRecord logRecord = DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, null); + assertNotNull(logRecord); + assertEquals(baseOffset + offsetDelta, logRecord.offset()); + assertEquals(baseSequence + offsetDelta, logRecord.sequence()); + assertEquals(baseTimestamp + timestampDelta, logRecord.timestamp()); + assertEquals(record.key(), logRecord.key()); + assertEquals(record.value(), logRecord.value()); + assertArrayEquals(record.headers(), logRecord.headers()); + assertEquals(DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, record.key(), record.value(), + record.headers()), logRecord.sizeInBytes()); } } @@ -83,7 +78,7 @@ public class DefaultRecordTest { long timestampDelta = 323; ByteBuffer buffer = ByteBuffer.allocate(1024); - DefaultRecord.writeTo(buffer, false, offsetDelta, timestampDelta, key, value, new Header[0]); + DefaultRecord.writeTo(buffer, offsetDelta, timestampDelta, key, value, new Header[0]); buffer.flip(); DefaultRecord record = DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, RecordBatch.NO_SEQUENCE, null); http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java b/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java new file mode 100644 index 0000000..903f674 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; + +public class EndTransactionMarkerTest { + + @Test(expected = IllegalArgumentException.class) + public void testUnknownControlTypeNotAllowed() { + new EndTransactionMarker(ControlRecordType.UNKNOWN, 24); + } + + @Test(expected = IllegalArgumentException.class) + public void testCannotDeserializeUnknownControlType() { + EndTransactionMarker.deserializeValue(ControlRecordType.UNKNOWN, ByteBuffer.wrap(new byte[0])); + } + + @Test(expected = InvalidRecordException.class) + public void testIllegalNegativeVersion() { + ByteBuffer buffer = ByteBuffer.allocate(2); + buffer.putShort((short) -1); + buffer.flip(); + EndTransactionMarker.deserializeValue(ControlRecordType.ABORT, buffer); + } + + @Test(expected = InvalidRecordException.class) + public void testNotEnoughBytes() { + EndTransactionMarker.deserializeValue(ControlRecordType.COMMIT, ByteBuffer.wrap(new byte[0])); + } + + @Test + public void testSerde() { + int coordinatorEpoch = 79; + EndTransactionMarker marker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch); + ByteBuffer buffer = marker.serializeValue(); + EndTransactionMarker deserialized = EndTransactionMarker.deserializeValue(ControlRecordType.COMMIT, buffer); + assertEquals(coordinatorEpoch, deserialized.coordinatorEpoch()); + } + + @Test + public void testDeserializeNewerVersion() { + int coordinatorEpoch = 79; + ByteBuffer buffer = ByteBuffer.allocate(8); + buffer.putShort((short) 5); + buffer.putInt(coordinatorEpoch); + buffer.putShort((short) 0); // unexpected data + buffer.flip(); + EndTransactionMarker deserialized = EndTransactionMarker.deserializeValue(ControlRecordType.COMMIT, buffer); + assertEquals(coordinatorEpoch, deserialized.coordinatorEpoch()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index 294f2f8..11ee419 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -140,25 +140,25 @@ public class FileRecordsTest { int message1Size = batches.get(0).sizeInBytes(); assertEquals("Should be able to find the first message by its offset", - new FileRecords.LogEntryPosition(0L, position, message1Size), + new FileRecords.LogOffsetPosition(0L, position, message1Size), fileRecords.searchForOffsetWithSize(0, 0)); position += message1Size; int message2Size = batches.get(1).sizeInBytes(); assertEquals("Should be able to find second message when starting from 0", - new FileRecords.LogEntryPosition(1L, position, message2Size), + new FileRecords.LogOffsetPosition(1L, position, message2Size), fileRecords.searchForOffsetWithSize(1, 0)); assertEquals("Should be able to find second message starting from its offset", - new FileRecords.LogEntryPosition(1L, position, message2Size), + new FileRecords.LogOffsetPosition(1L, position, message2Size), fileRecords.searchForOffsetWithSize(1, position)); position += message2Size + batches.get(2).sizeInBytes(); int message4Size = batches.get(3).sizeInBytes(); assertEquals("Should be able to find fourth message from a non-existant offset", - new FileRecords.LogEntryPosition(50L, position, message4Size), + new FileRecords.LogOffsetPosition(50L, position, message4Size), fileRecords.searchForOffsetWithSize(3, position)); assertEquals("Should be able to find fourth message by correct offset", - new FileRecords.LogEntryPosition(50L, position, message4Size), + new FileRecords.LogOffsetPosition(50L, position, message4Size), fileRecords.searchForOffsetWithSize(50, position)); } @@ -241,7 +241,6 @@ public class FileRecordsTest { EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce(); EasyMock.expect(channelMock.position(42L)).andReturn(null).once(); EasyMock.expect(channelMock.truncate(23L)).andReturn(null).once(); - EasyMock.expect(channelMock.position(23L)).andReturn(null).once(); EasyMock.replay(channelMock); FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false); http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index 330879f..0467522 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -50,7 +50,7 @@ public class MemoryRecordsBuilderTest { MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, - false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); MemoryRecords records = builder.build(); assertEquals(0, records.sizeInBytes()); assertEquals(bufferOffset, buffer.position()); @@ -66,8 +66,8 @@ public class MemoryRecordsBuilderTest { int sequence = 2342; MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, - TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, - buffer.capacity()); + TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, false, + RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.append(System.currentTimeMillis(), "foo".getBytes(), "bar".getBytes()); MemoryRecords records = builder.build(); @@ -86,7 +86,7 @@ public class MemoryRecordsBuilderTest { int sequence = 2342; new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); } @Test(expected = IllegalArgumentException.class) @@ -99,7 +99,33 @@ public class MemoryRecordsBuilderTest { int sequence = 2342; new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + } + + @Test(expected = IllegalArgumentException.class) + public void testWriteControlBatchNotAllowedMagicV0() { + ByteBuffer buffer = ByteBuffer.allocate(128); + buffer.position(bufferOffset); + + long pid = 9809; + short epoch = 15; + int sequence = 2342; + + new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, TimestampType.CREATE_TIME, + 0L, 0L, pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + } + + @Test(expected = IllegalArgumentException.class) + public void testWriteControlBatchNotAllowedMagicV1() { + ByteBuffer buffer = ByteBuffer.allocate(128); + buffer.position(bufferOffset); + + long pid = 9809; + short epoch = 15; + int sequence = 2342; + + new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME, + 0L, 0L, pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); } @Test(expected = IllegalArgumentException.class) @@ -112,7 +138,7 @@ public class MemoryRecordsBuilderTest { int sequence = 2342; MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.close(); } @@ -126,7 +152,7 @@ public class MemoryRecordsBuilderTest { int sequence = 2342; MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.close(); } @@ -140,10 +166,38 @@ public class MemoryRecordsBuilderTest { int sequence = RecordBatch.NO_SEQUENCE; MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.close(); } + @Test(expected = IllegalArgumentException.class) + public void testWriteEndTxnMarkerNonTransactionalBatch() { + ByteBuffer buffer = ByteBuffer.allocate(128); + buffer.position(bufferOffset); + + long pid = 9809; + short epoch = 15; + int sequence = RecordBatch.NO_SEQUENCE; + + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, + 0L, 0L, pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + builder.appendEndTxnMarker(RecordBatch.NO_TIMESTAMP, new EndTransactionMarker(ControlRecordType.ABORT, 0)); + } + + @Test(expected = IllegalArgumentException.class) + public void testWriteEndTxnMarkerNonControlBatch() { + ByteBuffer buffer = ByteBuffer.allocate(128); + buffer.position(bufferOffset); + + long pid = 9809; + short epoch = 15; + int sequence = RecordBatch.NO_SEQUENCE; + + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, + 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + builder.appendEndTxnMarker(RecordBatch.NO_TIMESTAMP, new EndTransactionMarker(ControlRecordType.ABORT, 0)); + } + @Test public void testCompressionRateV0() { ByteBuffer buffer = ByteBuffer.allocate(1024); @@ -157,7 +211,7 @@ public class MemoryRecordsBuilderTest { MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, - false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); int uncompressedSize = 0; for (LegacyRecord record : records) { @@ -188,7 +242,7 @@ public class MemoryRecordsBuilderTest { MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, - false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); int uncompressedSize = 0; for (LegacyRecord record : records) { @@ -214,7 +268,7 @@ public class MemoryRecordsBuilderTest { long logAppendTime = System.currentTimeMillis(); MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, - RecordBatch.NO_SEQUENCE, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.append(0L, "a".getBytes(), "1".getBytes()); builder.append(0L, "b".getBytes(), "2".getBytes()); builder.append(0L, "c".getBytes(), "3".getBytes()); @@ -243,7 +297,7 @@ public class MemoryRecordsBuilderTest { long logAppendTime = System.currentTimeMillis(); MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, - false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.append(0L, "a".getBytes(), "1".getBytes()); builder.append(2L, "b".getBytes(), "2".getBytes()); builder.append(1L, "c".getBytes(), "3".getBytes()); @@ -276,7 +330,7 @@ public class MemoryRecordsBuilderTest { ByteBuffer buffer = ByteBuffer.allocate(512); MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, 0L, LegacyRecord.NO_TIMESTAMP, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, - RecordBatch.NO_SEQUENCE, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, writeLimit); + RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, writeLimit); assertFalse(builder.isFull()); assertTrue(builder.hasRoomFor(0L, key, value)); @@ -302,7 +356,7 @@ public class MemoryRecordsBuilderTest { long logAppendTime = System.currentTimeMillis(); MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, - false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.append(0L, "a".getBytes(), "1".getBytes()); builder.append(1L, "b".getBytes(), "2".getBytes()); @@ -330,7 +384,7 @@ public class MemoryRecordsBuilderTest { long logAppendTime = System.currentTimeMillis(); MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, - false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.appendWithOffset(0L, System.currentTimeMillis(), "a".getBytes(), null); @@ -346,13 +400,18 @@ public class MemoryRecordsBuilderTest { builder.append(10L, "1".getBytes(), "a".getBytes()); builder.close(); + MemoryRecords.writeEndTransactionalMarker(buffer, 1L, 15L, (short) 0, + new EndTransactionMarker(ControlRecordType.ABORT, 0)); + builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType, TimestampType.CREATE_TIME, 1L); - builder.append(11L, "2".getBytes(), "b".getBytes()); - builder.appendControlRecord(12L, ControlRecordType.COMMIT, null); + builder.append(12L, "2".getBytes(), "b".getBytes()); builder.append(13L, "3".getBytes(), "c".getBytes()); builder.close(); + MemoryRecords.writeEndTransactionalMarker(buffer, 14L, 1L, (short) 0, + new EndTransactionMarker(ControlRecordType.COMMIT, 0)); + buffer.flip(); Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1); http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 49e1429..014a5bd 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -65,7 +65,7 @@ public class MemoryRecordsTest { ByteBuffer buffer = ByteBuffer.allocate(1024); MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compression, - TimestampType.CREATE_TIME, firstOffset, logAppendTime, pid, epoch, firstSequence, false, + TimestampType.CREATE_TIME, firstOffset, logAppendTime, pid, epoch, firstSequence, false, false, partitionLeaderEpoch, buffer.limit()); SimpleRecord[] records = new SimpleRecord[] { @@ -216,9 +216,44 @@ public class MemoryRecordsTest { } @Test + public void testBuildEndTxnMarker() { + if (magic >= RecordBatch.MAGIC_VALUE_V2) { + long producerId = 73; + short producerEpoch = 13; + long initialOffset = 983L; + int coordinatorEpoch = 347; + EndTransactionMarker marker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch); + MemoryRecords records = MemoryRecords.withEndTransactionMarker(initialOffset, producerId, producerEpoch, marker); + // verify that buffer allocation was precise + assertEquals(records.buffer().remaining(), records.buffer().capacity()); + + List<MutableRecordBatch> batches = TestUtils.toList(records.batches()); + assertEquals(1, batches.size()); + + RecordBatch batch = batches.get(0); + assertTrue(batch.isControlBatch()); + assertEquals(producerId, batch.producerId()); + assertEquals(producerEpoch, batch.producerEpoch()); + assertEquals(initialOffset, batch.baseOffset()); + assertTrue(batch.isValid()); + + List<Record> createdRecords = TestUtils.toList(batch); + assertEquals(1, createdRecords.size()); + + Record record = createdRecords.get(0); + assertTrue(record.isValid()); + EndTransactionMarker deserializedMarker = EndTransactionMarker.deserialize(record); + assertEquals(ControlRecordType.COMMIT, deserializedMarker.controlType()); + assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch()); + } + } + + @Test public void testFilterToPreservesProducerInfo() { if (magic >= RecordBatch.MAGIC_VALUE_V2) { ByteBuffer buffer = ByteBuffer.allocate(2048); + + // non-idempotent, non-transactional MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); builder.append(10L, null, "a".getBytes()); builder.append(11L, "1".getBytes(), "b".getBytes()); @@ -226,17 +261,28 @@ public class MemoryRecordsTest { builder.close(); - long pid = 23L; - short epoch = 5; - int baseSequence = 10; - + // idempotent + long pid1 = 23L; + short epoch1 = 5; + int baseSequence1 = 10; builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L, - RecordBatch.NO_TIMESTAMP, pid, epoch, baseSequence); + RecordBatch.NO_TIMESTAMP, pid1, epoch1, baseSequence1); builder.append(13L, null, "d".getBytes()); builder.append(14L, "4".getBytes(), "e".getBytes()); builder.append(15L, "5".getBytes(), "f".getBytes()); builder.close(); + // transactional + long pid2 = 99384L; + short epoch2 = 234; + int baseSequence2 = 15; + builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L, + RecordBatch.NO_TIMESTAMP, pid2, epoch2, baseSequence2, true, RecordBatch.NO_PARTITION_LEADER_EPOCH); + builder.append(16L, "6".getBytes(), "g".getBytes()); + builder.append(17L, null, "h".getBytes()); + builder.append(18L, "8".getBytes(), "i".getBytes()); + builder.close(); + buffer.flip(); ByteBuffer filtered = ByteBuffer.allocate(2048); @@ -246,7 +292,7 @@ public class MemoryRecordsTest { MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches()); - assertEquals(2, batches.size()); + assertEquals(3, batches.size()); MutableRecordBatch firstBatch = batches.get(0); assertEquals(1, firstBatch.countOrNull().intValue()); @@ -256,15 +302,27 @@ public class MemoryRecordsTest { assertEquals(RecordBatch.NO_PRODUCER_EPOCH, firstBatch.producerEpoch()); assertEquals(RecordBatch.NO_SEQUENCE, firstBatch.baseSequence()); assertEquals(RecordBatch.NO_SEQUENCE, firstBatch.lastSequence()); + assertFalse(firstBatch.isTransactional()); MutableRecordBatch secondBatch = batches.get(1); assertEquals(2, secondBatch.countOrNull().intValue()); assertEquals(3L, secondBatch.baseOffset()); assertEquals(5L, secondBatch.lastOffset()); - assertEquals(pid, secondBatch.producerId()); - assertEquals(epoch, secondBatch.producerEpoch()); - assertEquals(baseSequence, secondBatch.baseSequence()); - assertEquals(baseSequence + 2, secondBatch.lastSequence()); + assertEquals(pid1, secondBatch.producerId()); + assertEquals(epoch1, secondBatch.producerEpoch()); + assertEquals(baseSequence1, secondBatch.baseSequence()); + assertEquals(baseSequence1 + 2, secondBatch.lastSequence()); + assertFalse(secondBatch.isTransactional()); + + MutableRecordBatch thirdBatch = batches.get(2); + assertEquals(2, thirdBatch.countOrNull().intValue()); + assertEquals(3L, thirdBatch.baseOffset()); + assertEquals(5L, thirdBatch.lastOffset()); + assertEquals(pid2, thirdBatch.producerId()); + assertEquals(epoch2, thirdBatch.producerEpoch()); + assertEquals(baseSequence2, thirdBatch.baseSequence()); + assertEquals(baseSequence2 + 2, thirdBatch.lastSequence()); + assertTrue(thirdBatch.isTransactional()); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index c948fd1..6443e4d 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -94,6 +94,9 @@ public class RequestResponseTest { checkRequest(createListOffsetRequest(1)); checkErrorResponse(createListOffsetRequest(1), new UnknownServerException()); checkResponse(createListOffsetResponse(1), 1); + checkRequest(createListOffsetRequest(2)); + checkErrorResponse(createListOffsetRequest(2), new UnknownServerException()); + checkResponse(createListOffsetResponse(2), 2); checkRequest(MetadataRequest.Builder.allTopics().build((short) 2)); checkRequest(createMetadataRequest(1, asList("topic1"))); checkErrorResponse(createMetadataRequest(1, asList("topic1")), new UnknownServerException()); @@ -621,11 +624,24 @@ public class RequestResponseTest { Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = Collections.singletonMap( new TopicPartition("test", 0), new ListOffsetRequest.PartitionData(1000000L, 10)); - return ListOffsetRequest.Builder.forConsumer(false).setOffsetData(offsetData).build((short) version); + return ListOffsetRequest.Builder + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) + .setOffsetData(offsetData) + .build((short) version); } else if (version == 1) { Map<TopicPartition, Long> offsetData = Collections.singletonMap( new TopicPartition("test", 0), 1000000L); - return ListOffsetRequest.Builder.forConsumer(true).setTargetTimes(offsetData).build((short) version); + return ListOffsetRequest.Builder + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .setTargetTimes(offsetData) + .build((short) version); + } else if (version == 2) { + Map<TopicPartition, Long> offsetData = Collections.singletonMap( + new TopicPartition("test", 0), 1000000L); + return ListOffsetRequest.Builder + .forConsumer(true, IsolationLevel.READ_COMMITTED) + .setTargetTimes(offsetData) + .build((short) version); } else { throw new IllegalArgumentException("Illegal ListOffsetRequest version " + version); } @@ -638,7 +654,7 @@ public class RequestResponseTest { responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE, asList(100L))); return new ListOffsetResponse(responseData); - } else if (version == 1) { + } else if (version == 1 || version == 2) { Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE, 10000L, 100L)); http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 1eea8dc..1d13689 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -456,7 +456,7 @@ class Partition(val topic: String, laggingReplicas } - def appendRecordsToLeader(records: MemoryRecords, requiredAcks: Int = 0) = { + def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0) = { val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) { leaderReplicaIfLocal match { case Some(leaderReplica) => @@ -470,7 +470,7 @@ class Partition(val topic: String, .format(topicPartition, inSyncSize, minIsr)) } - val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch) + val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient) // probably unblock some follower fetch requests since log end offset has been updated replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId)) // we may need to increment high watermark since ISR could be down to 1 http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/cluster/Replica.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index a604b87..e3b1f2d 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -135,6 +135,7 @@ class Replica(val brokerId: Int, def highWatermark_=(newHighWatermark: LogOffsetMetadata) { if (isLocal) { highWatermarkMetadata = newHighWatermark + log.foreach(_.onHighWatermarkIncremented(newHighWatermark.messageOffset)) trace(s"Setting high watermark for replica $brokerId partition $topicPartition to [$newHighWatermark]") } else { throw new KafkaException(s"Should not set high watermark on partition $topicPartition's non-local replica $brokerId") @@ -143,6 +144,23 @@ class Replica(val brokerId: Int, def highWatermark = highWatermarkMetadata + /** + * The last stable offset (LSO) is defined as the first offset such that all lower offsets have been "decided." + * Non-transactional messages are considered decided immediately, but transactional messages are only decided when + * the corresponding COMMIT or ABORT marker is written. This implies that the last stable offset will be equal + * to the high watermark if there are no transactional messages in the log. Note also that the LSO cannot advance + * beyond the high watermark. + */ + def lastStableOffset: LogOffsetMetadata = { + log.map { log => + log.firstUnstableOffset match { + case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermark.messageOffset => offsetMetadata + case _ => highWatermark + } + }.getOrElse(throw new KafkaException(s"Cannot fetch last stable offset on partition $topicPartition's " + + s"non-local replica $brokerId")) + } + def convertHWToLocalOffsetMetadata() = { if (isLocal) { highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset) @@ -165,7 +183,10 @@ class Replica(val brokerId: Int, replicaString.append("; Partition: " + partition.partitionId) replicaString.append("; isLocal: " + isLocal) replicaString.append("; lastCaughtUpTimeMs: " + lastCaughtUpTimeMs) - if (isLocal) replicaString.append("; Highwatermark: " + highWatermark) + if (isLocal) { + replicaString.append("; Highwatermark: " + highWatermark) + replicaString.append("; LastStableOffset: " + lastStableOffset) + } replicaString.toString } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index e711392..3eafdb7 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -38,7 +38,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.types.Type._ import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct} import org.apache.kafka.common.record._ -import org.apache.kafka.common.requests.OffsetFetchResponse +import org.apache.kafka.common.requests.{IsolationLevel, OffsetFetchResponse} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time, Utils} @@ -225,7 +225,8 @@ class GroupMetadataManager(brokerId: Int, replicaManager.appendRecords( config.offsetCommitTimeoutMs.toLong, config.offsetCommitRequiredAcks, - true, // allow appending to internal offset topic + internalTopicsAllowed = true, + isFromClient = false, delayedStore.partitionRecords, delayedStore.callback) } @@ -429,7 +430,8 @@ class GroupMetadataManager(brokerId: Int, case Some(log) => var currOffset = log.logStartOffset - val buffer = ByteBuffer.allocate(config.loadBufferSize) + lazy val buffer = ByteBuffer.allocate(config.loadBufferSize) + // loop breaks if leader changes at any time during the load, since getHighWatermark is -1 val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]() val removedOffsets = mutable.Set[GroupTopicPartition]() @@ -437,12 +439,18 @@ class GroupMetadataManager(brokerId: Int, val removedGroups = mutable.Set[String]() while (currOffset < highWaterMark && !shuttingDown.get()) { - buffer.clear() - val fileRecords = log.read(currOffset, config.loadBufferSize, maxOffset = None, minOneMessage = true) - .records.asInstanceOf[FileRecords] - val bufferRead = fileRecords.readInto(buffer, 0) + val fetchDataInfo = log.read(currOffset, config.loadBufferSize, maxOffset = None, minOneMessage = true, + isolationLevel = IsolationLevel.READ_UNCOMMITTED) + + val memRecords = fetchDataInfo.records match { + case records: MemoryRecords => records + case fileRecords: FileRecords => + buffer.clear() + val bufferRead = fileRecords.readInto(buffer, 0) + MemoryRecords.readableRecords(bufferRead) + } - MemoryRecords.readableRecords(bufferRead).batches.asScala.foreach { batch => + memRecords.batches.asScala.foreach { batch => for (record <- batch.asScala) { require(record.hasKey, "Group metadata/offset entry key should not be null") GroupMetadataManager.readMessageKey(record.key) match { @@ -630,7 +638,8 @@ class GroupMetadataManager(brokerId: Int, // do not need to require acks since even if the tombstone is lost, // it will be appended again in the next purge cycle val records = MemoryRecords.withRecords(magicValue, 0L, compressionType, timestampType, tombstones: _*) - partition.appendRecordsToLeader(records) + partition.appendRecordsToLeader(records, isFromClient = false, requiredAcks = 0) + offsetsRemoved += removedOffsets.size trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired/deleted " + s"offsets and/or metadata for group $groupId") http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index f07ca91..7930cd0 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -31,6 +31,7 @@ import kafka.utils.{Logging, Pool, Scheduler, ZkUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord} +import org.apache.kafka.common.requests.IsolationLevel import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time, Utils} @@ -158,8 +159,7 @@ class TransactionStateManager(brokerId: Int, warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log") case Some(log) => - val buffer = ByteBuffer.allocate(config.transactionLogLoadBufferSize) - + lazy val buffer = ByteBuffer.allocate(config.transactionLogLoadBufferSize) val loadedTransactions = mutable.Map.empty[String, TransactionMetadata] val removedTransactionalIds = mutable.Set.empty[String] @@ -169,11 +169,17 @@ class TransactionStateManager(brokerId: Int, && loadingPartitions.contains(topicPartition.partition()) && !shuttingDown.get()) { buffer.clear() - val fileRecords = log.read(currOffset, config.transactionLogLoadBufferSize, maxOffset = None, minOneMessage = true) - .records.asInstanceOf[FileRecords] - val bufferRead = fileRecords.readInto(buffer, 0) + val fetchDataInfo = log.read(currOffset, config.transactionLogLoadBufferSize, maxOffset = None, + minOneMessage = true, isolationLevel = IsolationLevel.READ_UNCOMMITTED) + val memRecords = fetchDataInfo.records match { + case records: MemoryRecords => records + case fileRecords: FileRecords => + buffer.clear() + val bufferRead = fileRecords.readInto(buffer, 0) + MemoryRecords.readableRecords(bufferRead) + } - MemoryRecords.readableRecords(bufferRead).batches.asScala.foreach { batch => + memRecords.batches.asScala.foreach { batch => for (record <- batch.asScala) { require(record.hasKey, "Transaction state log's key should not be null") TransactionLog.readMessageKey(record.key) match { @@ -414,6 +420,7 @@ class TransactionStateManager(brokerId: Int, txnMetadata.txnTimeoutMs.toLong, TransactionLog.EnforcedRequiredAcks, internalTopicsAllowed = true, + isFromClient = false, recordsPerPartition, updateCacheCallback) } http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/AbstractIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index f7478ad..a125676 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -246,14 +246,26 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon * @param target The index key to look for * @return The slot found or -1 if the least entry in the index is larger than the target key or the index is empty */ - protected def indexSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int = { + protected def largestLowerBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int = + indexSlotRangeFor(idx, target, searchEntity)._1 + + /** + * Find the smallest entry greater than or equal the target key or value. If none can be found, -1 is returned. + */ + protected def smallestUpperBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int = + indexSlotRangeFor(idx, target, searchEntity)._2 + + /** + * Lookup lower and upper bounds for the given target. + */ + private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = { // check if the index is empty if(_entries == 0) - return -1 + return (-1, -1) // check if the target offset is smaller than the least offset if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) - return -1 + return (-1, 0) // binary search for the entry var lo = 0 @@ -267,9 +279,10 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon else if(compareResult < 0) lo = mid else - return mid + return (mid, mid) } - lo + + (lo, if (lo == _entries - 1) -1 else lo + 1) } private def compareIndexEntry(indexEntry: IndexEntry, target: Long, searchEntity: IndexSearchEntity): Int = {
