http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java index 1bc8a65..4a678d5 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.utils.AbstractIterator; import org.apache.kafka.common.utils.Utils; @@ -25,51 +26,58 @@ import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayDeque; +import java.util.Iterator; +/** + * An iterator which handles both the shallow and deep iteration of record sets. + */ public class RecordsIterator extends AbstractIterator<LogEntry> { - private final LogInputStream logStream; private final boolean shallow; + private final boolean ensureMatchingMagic; + private final int masRecordSize; + private final ShallowRecordsIterator<?> shallowIter; private DeepRecordsIterator innerIter; - public RecordsIterator(LogInputStream logStream, boolean shallow) { - this.logStream = logStream; + public RecordsIterator(LogInputStream<?> logInputStream, + boolean shallow, + boolean ensureMatchingMagic, + int masRecordSize) { + this.shallowIter = new ShallowRecordsIterator<>(logInputStream); this.shallow = shallow; + this.ensureMatchingMagic = ensureMatchingMagic; + this.masRecordSize = masRecordSize; } - /* - * Read the next record from the buffer. - * - * Note that in the compressed message set, each message value size is set as the size of the un-compressed - * version of the message value, so when we do de-compression allocating an array of the specified size for - * reading compressed value data is sufficient. + /** + * Get a shallow iterator over the given input stream. + * @param logInputStream The log input stream to read the entries from + * @param <T> The type of the log entry + * @return The shallow iterator. */ + public static <T extends LogEntry> Iterator<T> shallowIterator(LogInputStream<T> logInputStream) { + return new ShallowRecordsIterator<>(logInputStream); + } + @Override protected LogEntry makeNext() { if (innerDone()) { - try { - LogEntry entry = logStream.nextEntry(); - // No more record to return. - if (entry == null) - return allDone(); - - // decide whether to go shallow or deep iteration if it is compressed - CompressionType compressionType = entry.record().compressionType(); - if (compressionType == CompressionType.NONE || shallow) { - return entry; - } else { - // init the inner iterator with the value payload of the message, - // which will de-compress the payload to a set of messages; - // since we assume nested compression is not allowed, the deep iterator - // would not try to further decompress underlying messages - // There will be at least one element in the inner iterator, so we don't - // need to call hasNext() here. - innerIter = new DeepRecordsIterator(entry); - return innerIter.next(); - } - } catch (EOFException e) { + if (!shallowIter.hasNext()) return allDone(); - } catch (IOException e) { - throw new KafkaException(e); + + LogEntry entry = shallowIter.next(); + + // decide whether to go shallow or deep iteration if it is compressed + if (shallow || !entry.isCompressed()) { + return entry; + } else { + // init the inner iterator with the value payload of the message, + // which will de-compress the payload to a set of messages; + // since we assume nested compression is not allowed, the deep iterator + // would not try to further decompress underlying messages + // There will be at least one element in the inner iterator, so we don't + // need to call hasNext() here. + innerIter = new DeepRecordsIterator(entry, ensureMatchingMagic, masRecordSize); + return innerIter.next(); } } else { return innerIter.next(); @@ -80,38 +88,70 @@ public class RecordsIterator extends AbstractIterator<LogEntry> { return innerIter == null || !innerIter.hasNext(); } - private static class DataLogInputStream implements LogInputStream { + private static class DataLogInputStream implements LogInputStream<LogEntry> { private final DataInputStream stream; + protected final int maxMessageSize; - private DataLogInputStream(DataInputStream stream) { + DataLogInputStream(DataInputStream stream, int maxMessageSize) { this.stream = stream; + this.maxMessageSize = maxMessageSize; } public LogEntry nextEntry() throws IOException { - long offset = stream.readLong(); - int size = stream.readInt(); - if (size < 0) - throw new IllegalStateException("Record with size " + size); - - byte[] recordBuffer = new byte[size]; - stream.readFully(recordBuffer, 0, size); - ByteBuffer buf = ByteBuffer.wrap(recordBuffer); - return new LogEntry(offset, new Record(buf)); + try { + long offset = stream.readLong(); + int size = stream.readInt(); + if (size < Record.RECORD_OVERHEAD_V0) + throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", Record.RECORD_OVERHEAD_V0)); + if (size > maxMessageSize) + throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize)); + + byte[] recordBuffer = new byte[size]; + stream.readFully(recordBuffer, 0, size); + ByteBuffer buf = ByteBuffer.wrap(recordBuffer); + return LogEntry.create(offset, new Record(buf)); + } catch (EOFException e) { + return null; + } } } - private static class DeepRecordsIterator extends AbstractIterator<LogEntry> { + private static class ShallowRecordsIterator<T extends LogEntry> extends AbstractIterator<T> { + private final LogInputStream<T> logStream; + + public ShallowRecordsIterator(LogInputStream<T> logStream) { + this.logStream = logStream; + } + + @Override + protected T makeNext() { + try { + T entry = logStream.nextEntry(); + if (entry == null) + return allDone(); + return entry; + } catch (IOException e) { + throw new KafkaException(e); + } + } + } + + public static class DeepRecordsIterator extends AbstractIterator<LogEntry> { private final ArrayDeque<LogEntry> logEntries; private final long absoluteBaseOffset; + private final byte wrapperMagic; + + public DeepRecordsIterator(LogEntry wrapperEntry, boolean ensureMatchingMagic, int maxMessageSize) { + Record wrapperRecord = wrapperEntry.record(); + this.wrapperMagic = wrapperRecord.magic(); - private DeepRecordsIterator(LogEntry entry) { - CompressionType compressionType = entry.record().compressionType(); - ByteBuffer buffer = entry.record().value(); - DataInputStream stream = Compressor.wrapForInput(new ByteBufferInputStream(buffer), compressionType, entry.record().magic()); - LogInputStream logStream = new DataLogInputStream(stream); + CompressionType compressionType = wrapperRecord.compressionType(); + ByteBuffer buffer = wrapperRecord.value(); + DataInputStream stream = MemoryRecordsBuilder.wrapForInput(new ByteBufferInputStream(buffer), compressionType, wrapperRecord.magic()); + LogInputStream logStream = new DataLogInputStream(stream, maxMessageSize); - long wrapperRecordOffset = entry.offset(); - long wrapperRecordTimestamp = entry.record().timestamp(); + long wrapperRecordOffset = wrapperEntry.offset(); + long wrapperRecordTimestamp = wrapperRecord.timestamp(); this.logEntries = new ArrayDeque<>(); // If relative offset is used, we need to decompress the entire message first to compute @@ -119,22 +159,27 @@ public class RecordsIterator extends AbstractIterator<LogEntry> { // do the same for message format version 0 try { while (true) { - try { - LogEntry logEntry = logStream.nextEntry(); - if (entry.record().magic() > Record.MAGIC_VALUE_V0) { - Record recordWithTimestamp = new Record( - logEntry.record().buffer(), - wrapperRecordTimestamp, - entry.record().timestampType() - ); - logEntry = new LogEntry(logEntry.offset(), recordWithTimestamp); - } - logEntries.add(logEntry); - } catch (EOFException e) { + LogEntry logEntry = logStream.nextEntry(); + if (logEntry == null) break; + + Record record = logEntry.record(); + byte magic = record.magic(); + + if (ensureMatchingMagic && magic != wrapperMagic) + throw new InvalidRecordException("Compressed message magic does not match wrapper magic"); + + if (magic > Record.MAGIC_VALUE_V0) { + Record recordWithTimestamp = new Record( + record.buffer(), + wrapperRecordTimestamp, + wrapperRecord.timestampType() + ); + logEntry = LogEntry.create(logEntry.offset(), recordWithTimestamp); } + logEntries.addLast(logEntry); } - if (entry.record().magic() > Record.MAGIC_VALUE_V0) + if (wrapperMagic > Record.MAGIC_VALUE_V0) this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset(); else this.absoluteBaseOffset = -1; @@ -155,12 +200,10 @@ public class RecordsIterator extends AbstractIterator<LogEntry> { // Convert offset to absolute offset if needed. if (absoluteBaseOffset >= 0) { long absoluteOffset = absoluteBaseOffset + entry.offset(); - entry = new LogEntry(absoluteOffset, entry.record()); + entry = LogEntry.create(absoluteOffset, entry.record()); } - // decide whether to go shallow or deep iteration if it is compressed - CompressionType compression = entry.record().compressionType(); - if (compression != CompressionType.NONE) + if (entry.isCompressed()) throw new InvalidRecordException("Inner messages must not be compressed"); return entry;
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java index 62fd814..55c966a 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java @@ -27,6 +27,7 @@ public enum TimestampType { public final int id; public final String name; + TimestampType(int id, String name) { this.id = id; this.name = name; http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index c3c1045..c5e6716 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -266,6 +266,24 @@ public class Utils { } /** + * Convert a ByteBuffer to a nullable array. + * @param buffer The buffer to convert + * @return The resulting array or null if the buffer is null + */ + public static byte[] toNullableArray(ByteBuffer buffer) { + return buffer == null ? null : toArray(buffer); + } + + /** + * Wrap an array as a nullable ByteBuffer. + * @param array The nullable array to wrap + * @return The wrapping ByteBuffer or null if array is null + */ + public static ByteBuffer wrapNullable(byte[] array) { + return array == null ? null : ByteBuffer.wrap(array); + } + + /** * Read a byte array from the given offset and size in the buffer */ public static byte[] toArray(ByteBuffer buffer, int offset, int size) { @@ -733,4 +751,37 @@ public class Utils { public static int longHashcode(long value) { return (int) (value ^ (value >>> 32)); } + + /** + * Read a size-delimited byte buffer starting at the given offset. + * @param buffer Buffer containing the size and data + * @param start Offset in the buffer to read from + * @return A slice of the buffer containing only the delimited data (excluding the size) + */ + public static ByteBuffer sizeDelimited(ByteBuffer buffer, int start) { + int size = buffer.getInt(start); + if (size < 0) { + return null; + } else { + ByteBuffer b = buffer.duplicate(); + b.position(start + 4); + b = b.slice(); + b.limit(size); + b.rewind(); + return b; + } + } + + /** + * Compute the checksum of a range of data + * @param buffer Buffer containing the data to checksum + * @param start Offset in the buffer to read from + * @param size The number of bytes to include + */ + public static long computeChecksum(ByteBuffer buffer, int start, int size) { + Crc32 crc = new Crc32(); + crc.update(buffer.array(), buffer.arrayOffset() + start, size); + return crc.getValue(); + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index ad6c127..a4386f8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -39,6 +39,8 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.requests.FetchResponse.PartitionData; @@ -1323,11 +1325,10 @@ public class KafkaConsumerTest { TopicPartition partition = fetchEntry.getKey(); long fetchOffset = fetchEntry.getValue().offset; int fetchCount = fetchEntry.getValue().count; - MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE); + MemoryRecordsBuilder records = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); for (int i = 0; i < fetchCount; i++) records.append(fetchOffset + i, 0L, ("key-" + i).getBytes(), ("value-" + i).getBytes()); - records.close(); - tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE.code(), 0, records)); + tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE.code(), 0, records.build())); } return new FetchResponse(tpResponses, 0); } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 6d5896f..15075cb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -37,10 +37,12 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.ByteBufferOutputStream; import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.record.Compressor; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; @@ -93,8 +95,8 @@ public class FetcherTest { private static final double EPSILON = 0.0001; private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); - private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE); - private MemoryRecords nextRecords = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE); + private MemoryRecords records; + private MemoryRecords nextRecords; private Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, metrics); private Metrics fetcherMetrics = new Metrics(time); private Fetcher<byte[], byte[]> fetcherNoAutoReset = createFetcher(subscriptionsNoAutoReset, fetcherMetrics); @@ -104,14 +106,16 @@ public class FetcherTest { metadata.update(cluster, time.milliseconds()); client.setNode(node); - records.append(1L, 0L, "key".getBytes(), "value-1".getBytes()); - records.append(2L, 0L, "key".getBytes(), "value-2".getBytes()); - records.append(3L, 0L, "key".getBytes(), "value-3".getBytes()); - records.close(); + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); + builder.append(1L, 0L, "key".getBytes(), "value-1".getBytes()); + builder.append(2L, 0L, "key".getBytes(), "value-2".getBytes()); + builder.append(3L, 0L, "key".getBytes(), "value-3".getBytes()); + records = builder.build(); - nextRecords.append(4L, 0L, "key".getBytes(), "value-4".getBytes()); - nextRecords.append(5L, 0L, "key".getBytes(), "value-5".getBytes()); - nextRecords.close(); + builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); + builder.append(4L, 0L, "key".getBytes(), "value-4".getBytes()); + builder.append(5L, 0L, "key".getBytes(), "value-5".getBytes()); + nextRecords = builder.build(); } @After @@ -129,7 +133,7 @@ public class FetcherTest { assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); @@ -154,7 +158,7 @@ public class FetcherTest { assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); @@ -192,7 +196,7 @@ public class FetcherTest { subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 1); - client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); + client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records, Errors.NONE.code(), 100L, 0)); assertEquals(1, fetcher.sendFetches()); consumerClient.poll(0); @@ -206,29 +210,30 @@ public class FetcherTest { } @Test - public void testParseInvalidRecord() { + public void testParseInvalidRecord() throws Exception { ByteBuffer buffer = ByteBuffer.allocate(1024); - Compressor compressor = new Compressor(buffer, CompressionType.NONE); + ByteBufferOutputStream out = new ByteBufferOutputStream(buffer); + byte magic = Record.CURRENT_MAGIC_VALUE; byte[] key = "foo".getBytes(); byte[] value = "baz".getBytes(); long offset = 0; long timestamp = 500L; int size = Record.recordSize(key, value); - long crc = Record.computeChecksum(timestamp, key, value, CompressionType.NONE, 0, -1); + byte attributes = Record.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME); + long crc = Record.computeChecksum(magic, attributes, timestamp, key, value); // write one valid record - compressor.putLong(offset); - compressor.putInt(size); - Record.write(compressor, crc, Record.computeAttributes(CompressionType.NONE), timestamp, key, value, 0, -1); + out.writeLong(offset); + out.writeInt(size); + Record.write(out, magic, crc, Record.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME), timestamp, key, value); // and one invalid record (note the crc) - compressor.putLong(offset); - compressor.putInt(size); - Record.write(compressor, crc + 1, Record.computeAttributes(CompressionType.NONE), timestamp, key, value, 0, -1); + out.writeLong(offset); + out.writeInt(size); + Record.write(out, magic, crc + 1, Record.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME), timestamp, key, value); - compressor.close(); buffer.flip(); subscriptions.assignFromUser(singleton(tp)); @@ -236,7 +241,7 @@ public class FetcherTest { // normal fetch assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(buffer, Errors.NONE.code(), 100L, 0)); + client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE.code(), 100L, 0)); consumerClient.poll(0); try { fetcher.fetchedRecords(); @@ -255,8 +260,8 @@ public class FetcherTest { subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 1); - client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); - client.prepareResponse(matchesOffset(tp, 4), fetchResponse(this.nextRecords.buffer(), Errors.NONE.code(), 100L, 0)); + client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records, Errors.NONE.code(), 100L, 0)); + client.prepareResponse(matchesOffset(tp, 4), fetchResponse(this.nextRecords, Errors.NONE.code(), 100L, 0)); assertEquals(1, fetcher.sendFetches()); consumerClient.poll(0); @@ -287,11 +292,11 @@ public class FetcherTest { // if we are fetching from a compacted topic, there may be gaps in the returned records // this test verifies the fetcher updates the current fetched/consumed positions correctly for this case - MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE); - records.append(15L, 0L, "key".getBytes(), "value-1".getBytes()); - records.append(20L, 0L, "key".getBytes(), "value-2".getBytes()); - records.append(30L, 0L, "key".getBytes(), "value-3".getBytes()); - records.close(); + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); + builder.append(15L, 0L, "key".getBytes(), "value-1".getBytes()); + builder.append(20L, 0L, "key".getBytes(), "value-2".getBytes()); + builder.append(30L, 0L, "key".getBytes(), "value-3".getBytes()); + MemoryRecords records = builder.build(); List<ConsumerRecord<byte[], byte[]>> consumerRecords; subscriptions.assignFromUser(singleton(tp)); @@ -299,7 +304,7 @@ public class FetcherTest { // normal fetch assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(records.buffer(), Errors.NONE.code(), 100L, 0)); + client.prepareResponse(fetchResponse(records, Errors.NONE.code(), 100L, 0)); consumerClient.poll(0); consumerRecords = fetcher.fetchedRecords().get(tp); assertEquals(3, consumerRecords.size()); @@ -317,7 +322,7 @@ public class FetcherTest { // resize the limit of the buffer to pretend it is only fetch-size large assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0)); consumerClient.poll(0); try { fetcher.fetchedRecords(); @@ -337,7 +342,7 @@ public class FetcherTest { // Now the rebalance happens and fetch positions are cleared subscriptions.assignFromSubscribed(singleton(tp)); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0)); consumerClient.poll(0); // The active fetch should be ignored since its position is no longer valid @@ -352,7 +357,7 @@ public class FetcherTest { assertEquals(1, fetcher.sendFetches()); subscriptions.pause(tp); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0)); consumerClient.poll(0); assertNull(fetcher.fetchedRecords().get(tp)); } @@ -373,7 +378,7 @@ public class FetcherTest { subscriptions.seek(tp, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0)); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); @@ -385,7 +390,7 @@ public class FetcherTest { subscriptions.seek(tp, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0)); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); @@ -397,7 +402,7 @@ public class FetcherTest { subscriptions.seek(tp, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); assertTrue(subscriptions.isOffsetResetNeeded(tp)); @@ -412,7 +417,7 @@ public class FetcherTest { subscriptions.seek(tp, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); subscriptions.seek(tp, 1); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); @@ -426,7 +431,7 @@ public class FetcherTest { subscriptionsNoAutoReset.seek(tp, 0); assertTrue(fetcherNoAutoReset.sendFetches() > 0); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); consumerClient.poll(0); assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp)); subscriptionsNoAutoReset.seek(tp, 2); @@ -439,7 +444,7 @@ public class FetcherTest { subscriptionsNoAutoReset.seek(tp, 0); fetcherNoAutoReset.sendFetches(); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); consumerClient.poll(0); assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp)); @@ -459,7 +464,7 @@ public class FetcherTest { subscriptions.seek(tp, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0), true); + client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0), true); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); @@ -611,14 +616,14 @@ public class FetcherTest { // We need to make sure the message offset grows. Otherwise they will be considered as already consumed // and filtered out by consumer. if (i > 1) { - this.records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE); + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); for (int v = 0; v < 3; v++) { - this.records.append((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes()); + builder.append((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes()); } - this.records.close(); + this.records = builder.build(); } assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 100 * i)); + client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 100 * i)); consumerClient.poll(0); records = fetcher.fetchedRecords().get(tp); assertEquals(3, records.size()); @@ -722,8 +727,7 @@ public class FetcherTest { return new ListOffsetResponse(allPartitionData, 1); } - private FetchResponse fetchResponse(ByteBuffer buffer, short error, long hw, int throttleTime) { - MemoryRecords records = MemoryRecords.readableRecords(buffer); + private FetchResponse fetchResponse(MemoryRecords records, short error, long hw, int throttleTime) { return new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, records)), throttleTime); } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 28521e8..4f25bdf 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -12,23 +12,6 @@ */ package org.apache.kafka.clients.producer.internals; -import static java.util.Arrays.asList; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Deque; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; @@ -45,6 +28,23 @@ import org.apache.kafka.common.utils.Time; import org.junit.After; import org.junit.Test; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Deque; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class RecordAccumulatorTest { private String topic = "test"; @@ -84,7 +84,7 @@ public class RecordAccumulatorTest { accum.append(tp1, 0L, key, value, null, maxBlockTimeMs); Deque<RecordBatch> partitionBatches = accum.batches().get(tp1); assertEquals(1, partitionBatches.size()); - assertTrue(partitionBatches.peekFirst().records.isWritable()); + assertTrue(partitionBatches.peekFirst().isWritable()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } @@ -93,15 +93,15 @@ public class RecordAccumulatorTest { Deque<RecordBatch> partitionBatches = accum.batches().get(tp1); assertEquals(2, partitionBatches.size()); Iterator<RecordBatch> partitionBatchesIterator = partitionBatches.iterator(); - assertFalse(partitionBatchesIterator.next().records.isWritable()); - assertTrue(partitionBatchesIterator.next().records.isWritable()); + assertFalse(partitionBatchesIterator.next().isWritable()); + assertTrue(partitionBatchesIterator.next().isWritable()); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); - Iterator<LogEntry> iter = batch.records.iterator(); + Iterator<LogEntry> iter = batch.records().deepIterator(); for (int i = 0; i < appends; i++) { LogEntry entry = iter.next(); assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key()); @@ -130,7 +130,7 @@ public class RecordAccumulatorTest { assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); - Iterator<LogEntry> iter = batch.records.iterator(); + Iterator<LogEntry> iter = batch.records().deepIterator(); LogEntry entry = iter.next(); assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key()); assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value()); @@ -159,7 +159,7 @@ public class RecordAccumulatorTest { final int msgs = 10000; final int numParts = 2; final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time); - List<Thread> threads = new ArrayList<Thread>(); + List<Thread> threads = new ArrayList<>(); for (int i = 0; i < numThreads; i++) { threads.add(new Thread() { public void run() { @@ -182,8 +182,11 @@ public class RecordAccumulatorTest { List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id()); if (batches != null) { for (RecordBatch batch : batches) { - for (LogEntry entry : batch.records) + Iterator<LogEntry> deepEntries = batch.records().deepIterator(); + while (deepEntries.hasNext()) { + deepEntries.next(); read++; + } accum.deallocate(batch); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java new file mode 100644 index 0000000..62e8a05 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.common.record; + +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Iterator; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ByteBufferLogInputStreamTest { + + @Test + public void iteratorIgnoresIncompleteEntries() { + ByteBuffer buffer = ByteBuffer.allocate(2048); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); + builder.append(0L, 15L, "a".getBytes(), "1".getBytes()); + builder.append(1L, 20L, "b".getBytes(), "2".getBytes()); + + ByteBuffer recordsBuffer = builder.build().buffer(); + recordsBuffer.limit(recordsBuffer.limit() - 5); + + Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = MemoryRecords.readableRecords(recordsBuffer).shallowIterator(); + assertTrue(iterator.hasNext()); + ByteBufferLogInputStream.ByteBufferLogEntry first = iterator.next(); + assertEquals(0L, first.offset()); + + assertFalse(iterator.hasNext()); + } + + @Test + public void testSetCreateTimeV1() { + ByteBuffer buffer = ByteBuffer.allocate(2048); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); + builder.append(0L, 15L, "a".getBytes(), "1".getBytes()); + Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator(); + + assertTrue(iterator.hasNext()); + ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next(); + + long createTimeMs = 20L; + entry.setCreateTime(createTimeMs); + + assertEquals(TimestampType.CREATE_TIME, entry.record().timestampType()); + assertEquals(createTimeMs, entry.record().timestamp()); + } + + @Test(expected = IllegalArgumentException.class) + public void testSetCreateTimeNotAllowedV0() { + ByteBuffer buffer = ByteBuffer.allocate(2048); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); + builder.append(0L, 15L, "a".getBytes(), "1".getBytes()); + Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator(); + + assertTrue(iterator.hasNext()); + ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next(); + + long createTimeMs = 20L; + entry.setCreateTime(createTimeMs); + } + + @Test + public void testSetLogAppendTimeV1() { + ByteBuffer buffer = ByteBuffer.allocate(2048); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); + builder.append(0L, 15L, "a".getBytes(), "1".getBytes()); + Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator(); + + assertTrue(iterator.hasNext()); + ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next(); + + long logAppendTime = 20L; + entry.setLogAppendTime(logAppendTime); + + assertEquals(TimestampType.LOG_APPEND_TIME, entry.record().timestampType()); + assertEquals(logAppendTime, entry.record().timestamp()); + } + + @Test(expected = IllegalArgumentException.class) + public void testSetLogAppendTimeNotAllowedV0() { + ByteBuffer buffer = ByteBuffer.allocate(2048); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); + builder.append(0L, 15L, "a".getBytes(), "1".getBytes()); + Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator(); + + assertTrue(iterator.hasNext()); + ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next(); + + long logAppendTime = 20L; + entry.setLogAppendTime(logAppendTime); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java new file mode 100644 index 0000000..7e2c256 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -0,0 +1,410 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.common.record; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.test.TestUtils; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import static org.apache.kafka.test.TestUtils.tempFile; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class FileRecordsTest { + + private Record[] records = new Record[] { + Record.create("abcd".getBytes()), + Record.create("efgh".getBytes()), + Record.create("ijkl".getBytes()) + }; + private FileRecords fileRecords; + + @Before + public void setup() throws IOException { + this.fileRecords = createFileRecords(records); + } + + /** + * Test that the cached size variable matches the actual file size as we append messages + */ + @Test + public void testFileSize() throws IOException { + assertEquals(fileRecords.channel().size(), fileRecords.sizeInBytes()); + for (int i = 0; i < 20; i++) { + fileRecords.append(MemoryRecords.withRecords(Record.create("abcd".getBytes()))); + assertEquals(fileRecords.channel().size(), fileRecords.sizeInBytes()); + } + } + + /** + * Test that adding invalid bytes to the end of the log doesn't break iteration + */ + @Test + public void testIterationOverPartialAndTruncation() throws IOException { + testPartialWrite(0, fileRecords); + testPartialWrite(2, fileRecords); + testPartialWrite(4, fileRecords); + testPartialWrite(5, fileRecords); + testPartialWrite(6, fileRecords); + } + + private void testPartialWrite(int size, FileRecords fileRecords) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(size); + for (int i = 0; i < size; i++) + buffer.put((byte) 0); + + buffer.rewind(); + + fileRecords.channel().write(buffer); + + // appending those bytes should not change the contents + TestUtils.checkEquals(Arrays.asList(records).iterator(), fileRecords.records()); + } + + /** + * Iterating over the file does file reads but shouldn't change the position of the underlying FileChannel. + */ + @Test + public void testIterationDoesntChangePosition() throws IOException { + long position = fileRecords.channel().position(); + TestUtils.checkEquals(Arrays.asList(records).iterator(), fileRecords.records()); + assertEquals(position, fileRecords.channel().position()); + } + + /** + * Test a simple append and read. + */ + @Test + public void testRead() throws IOException { + FileRecords read = fileRecords.read(0, fileRecords.sizeInBytes()); + TestUtils.checkEquals(fileRecords.shallowIterator(), read.shallowIterator()); + + List<LogEntry> items = shallowEntries(read); + LogEntry second = items.get(1); + + read = fileRecords.read(second.sizeInBytes(), fileRecords.sizeInBytes()); + assertEquals("Try a read starting from the second message", + items.subList(1, 3), shallowEntries(read)); + + read = fileRecords.read(second.sizeInBytes(), second.sizeInBytes()); + assertEquals("Try a read of a single message starting from the second message", + Collections.singletonList(second), shallowEntries(read)); + } + + /** + * Test the MessageSet.searchFor API. + */ + @Test + public void testSearch() throws IOException { + // append a new message with a high offset + Record lastMessage = Record.create("test".getBytes()); + fileRecords.append(MemoryRecords.withRecords(50L, lastMessage)); + + List<LogEntry> entries = shallowEntries(fileRecords); + int position = 0; + + int message1Size = entries.get(0).sizeInBytes(); + assertEquals("Should be able to find the first message by its offset", + new FileRecords.LogEntryPosition(0L, position, message1Size), + fileRecords.searchForOffsetWithSize(0, 0)); + position += message1Size; + + int message2Size = entries.get(1).sizeInBytes(); + assertEquals("Should be able to find second message when starting from 0", + new FileRecords.LogEntryPosition(1L, position, message2Size), + fileRecords.searchForOffsetWithSize(1, 0)); + assertEquals("Should be able to find second message starting from its offset", + new FileRecords.LogEntryPosition(1L, position, message2Size), + fileRecords.searchForOffsetWithSize(1, position)); + position += message2Size + entries.get(2).sizeInBytes(); + + int message4Size = entries.get(3).sizeInBytes(); + assertEquals("Should be able to find fourth message from a non-existant offset", + new FileRecords.LogEntryPosition(50L, position, message4Size), + fileRecords.searchForOffsetWithSize(3, position)); + assertEquals("Should be able to find fourth message by correct offset", + new FileRecords.LogEntryPosition(50L, position, message4Size), + fileRecords.searchForOffsetWithSize(50, position)); + } + + /** + * Test that the message set iterator obeys start and end slicing + */ + @Test + public void testIteratorWithLimits() throws IOException { + LogEntry entry = shallowEntries(fileRecords).get(1); + int start = fileRecords.searchForOffsetWithSize(1, 0).position; + int size = entry.sizeInBytes(); + FileRecords slice = fileRecords.read(start, size); + assertEquals(Collections.singletonList(entry), shallowEntries(slice)); + FileRecords slice2 = fileRecords.read(start, size - 1); + assertEquals(Collections.emptyList(), shallowEntries(slice2)); + } + + /** + * Test the truncateTo method lops off messages and appropriately updates the size + */ + @Test + public void testTruncate() throws IOException { + LogEntry entry = shallowEntries(fileRecords).get(0); + int end = fileRecords.searchForOffsetWithSize(1, 0).position; + fileRecords.truncateTo(end); + assertEquals(Collections.singletonList(entry), shallowEntries(fileRecords)); + assertEquals(entry.sizeInBytes(), fileRecords.sizeInBytes()); + } + + /** + * Test that truncateTo only calls truncate on the FileChannel if the size of the + * FileChannel is bigger than the target size. This is important because some JVMs + * change the mtime of the file, even if truncate should do nothing. + */ + @Test + public void testTruncateNotCalledIfSizeIsSameAsTargetSize() throws IOException { + FileChannel channelMock = EasyMock.createMock(FileChannel.class); + + EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce(); + EasyMock.expect(channelMock.position(42L)).andReturn(null); + EasyMock.replay(channelMock); + + FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false); + fileRecords.truncateTo(42); + + EasyMock.verify(channelMock); + } + + /** + * Expect a KafkaException if targetSize is bigger than the size of + * the FileRecords. + */ + @Test + public void testTruncateNotCalledIfSizeIsBiggerThanTargetSize() throws IOException { + FileChannel channelMock = EasyMock.createMock(FileChannel.class); + + EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce(); + EasyMock.expect(channelMock.position(42L)).andReturn(null); + EasyMock.replay(channelMock); + + FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false); + + try { + fileRecords.truncateTo(43); + fail("Should throw KafkaException"); + } catch (KafkaException e) { + // expected + } + + EasyMock.verify(channelMock); + } + + /** + * see #testTruncateNotCalledIfSizeIsSameAsTargetSize + */ + @Test + public void testTruncateIfSizeIsDifferentToTargetSize() throws IOException { + FileChannel channelMock = EasyMock.createMock(FileChannel.class); + + EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce(); + EasyMock.expect(channelMock.position(42L)).andReturn(null).once(); + EasyMock.expect(channelMock.truncate(23L)).andReturn(null).once(); + EasyMock.expect(channelMock.position(23L)).andReturn(null).once(); + EasyMock.replay(channelMock); + + FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false); + fileRecords.truncateTo(23); + + EasyMock.verify(channelMock); + } + + /** + * Test the new FileRecords with pre allocate as true + */ + @Test + public void testPreallocateTrue() throws IOException { + File temp = tempFile(); + FileRecords fileRecords = FileRecords.open(temp, false, 512 * 1024 * 1024, true); + long position = fileRecords.channel().position(); + int size = fileRecords.sizeInBytes(); + assertEquals(0, position); + assertEquals(0, size); + assertEquals(512 * 1024 * 1024, temp.length()); + } + + /** + * Test the new FileRecords with pre allocate as false + */ + @Test + public void testPreallocateFalse() throws IOException { + File temp = tempFile(); + FileRecords set = FileRecords.open(temp, false, 512 * 1024 * 1024, false); + long position = set.channel().position(); + int size = set.sizeInBytes(); + assertEquals(0, position); + assertEquals(0, size); + assertEquals(0, temp.length()); + } + + /** + * Test the new FileRecords with pre allocate as true and file has been clearly shut down, the file will be truncate to end of valid data. + */ + @Test + public void testPreallocateClearShutdown() throws IOException { + File temp = tempFile(); + FileRecords set = FileRecords.open(temp, false, 512 * 1024 * 1024, true); + set.append(MemoryRecords.withRecords(records)); + + int oldPosition = (int) set.channel().position(); + int oldSize = set.sizeInBytes(); + assertEquals(fileRecords.sizeInBytes(), oldPosition); + assertEquals(fileRecords.sizeInBytes(), oldSize); + set.close(); + + File tempReopen = new File(temp.getAbsolutePath()); + FileRecords setReopen = FileRecords.open(tempReopen, true, 512 * 1024 * 1024, true); + int position = (int) setReopen.channel().position(); + int size = setReopen.sizeInBytes(); + + assertEquals(oldPosition, position); + assertEquals(oldPosition, size); + assertEquals(oldPosition, tempReopen.length()); + } + + @Test + public void testFormatConversionWithPartialMessage() throws IOException { + LogEntry entry = shallowEntries(fileRecords).get(1); + int start = fileRecords.searchForOffsetWithSize(1, 0).position; + int size = entry.sizeInBytes(); + FileRecords slice = fileRecords.read(start, size - 1); + Records messageV0 = slice.toMessageFormat(Record.MAGIC_VALUE_V0); + assertTrue("No message should be there", shallowEntries(messageV0).isEmpty()); + assertEquals("There should be " + (size - 1) + " bytes", size - 1, messageV0.sizeInBytes()); + } + + @Test + public void testConvertNonCompressedToMagic1() throws IOException { + List<LogEntry> entries = Arrays.asList( + LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k1".getBytes(), "hello".getBytes())), + LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k2".getBytes(), "goodbye".getBytes()))); + MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.NONE, entries); + + // Up conversion. In reality we only do down conversion, but up conversion should work as well. + // up conversion for non-compressed messages + try (FileRecords fileRecords = FileRecords.open(tempFile())) { + fileRecords.append(records); + fileRecords.flush(); + Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1); + verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V1); + } + } + + @Test + public void testConvertCompressedToMagic1() throws IOException { + List<LogEntry> entries = Arrays.asList( + LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k1".getBytes(), "hello".getBytes())), + LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k2".getBytes(), "goodbye".getBytes()))); + MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.GZIP, entries); + + // up conversion for compressed messages + try (FileRecords fileRecords = FileRecords.open(tempFile())) { + fileRecords.append(records); + fileRecords.flush(); + Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1); + verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V1); + } + } + + @Test + public void testConvertNonCompressedToMagic0() throws IOException { + List<LogEntry> entries = Arrays.asList( + LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V1, 1L, "k1".getBytes(), "hello".getBytes())), + LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V1, 2L, "k2".getBytes(), "goodbye".getBytes()))); + MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.NONE, entries); + + // down conversion for non-compressed messages + try (FileRecords fileRecords = FileRecords.open(tempFile())) { + fileRecords.append(records); + fileRecords.flush(); + Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0); + verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V0); + } + } + + @Test + public void testConvertCompressedToMagic0() throws IOException { + List<LogEntry> entries = Arrays.asList( + LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V1, 1L, "k1".getBytes(), "hello".getBytes())), + LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V1, 2L, "k2".getBytes(), "goodbye".getBytes()))); + MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.GZIP, entries); + + // down conversion for compressed messages + try (FileRecords fileRecords = FileRecords.open(tempFile())) { + fileRecords.append(records); + fileRecords.flush(); + Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0); + verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V0); + } + } + + private void verifyConvertedMessageSet(List<LogEntry> initialEntries, Records convertedRecords, byte magicByte) { + int i = 0; + for (LogEntry logEntry : deepEntries(convertedRecords)) { + assertEquals("magic byte should be " + magicByte, magicByte, logEntry.record().magic()); + assertEquals("offset should not change", initialEntries.get(i).offset(), logEntry.offset()); + assertEquals("key should not change", initialEntries.get(i).record().key(), logEntry.record().key()); + assertEquals("payload should not change", initialEntries.get(i).record().value(), logEntry.record().value()); + i += 1; + } + } + + private static List<LogEntry> shallowEntries(Records buffer) { + List<LogEntry> entries = new ArrayList<>(); + Iterator<? extends LogEntry> iterator = buffer.shallowIterator(); + while (iterator.hasNext()) + entries.add(iterator.next()); + return entries; + } + + private static List<LogEntry> deepEntries(Records buffer) { + List<LogEntry> entries = new ArrayList<>(); + Iterator<? extends LogEntry> iterator = buffer.shallowIterator(); + while (iterator.hasNext()) { + for (LogEntry deepEntry : iterator.next()) + entries.add(deepEntry); + } + return entries; + } + + private FileRecords createFileRecords(Record ... records) throws IOException { + FileRecords fileRecords = FileRecords.open(tempFile()); + fileRecords.append(MemoryRecords.withRecords(records)); + fileRecords.flush(); + return fileRecords; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java new file mode 100644 index 0000000..40fa212 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.common.record; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +@RunWith(value = Parameterized.class) +public class MemoryRecordsBuilderTest { + + private final CompressionType compressionType; + private final int bufferOffset; + + public MemoryRecordsBuilderTest(int bufferOffset, CompressionType compressionType) { + this.bufferOffset = bufferOffset; + this.compressionType = compressionType; + } + + @Test + public void testCompressionRateV0() { + ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.position(bufferOffset); + + Record[] records = new Record[] { + Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()), + Record.create(Record.MAGIC_VALUE_V0, 1L, "b".getBytes(), "2".getBytes()), + Record.create(Record.MAGIC_VALUE_V0, 2L, "c".getBytes(), "3".getBytes()), + }; + + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V0, compressionType, + TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity()); + + int uncompressedSize = 0; + long offset = 0L; + for (Record record : records) { + uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD; + builder.append(offset++, record); + } + + MemoryRecords built = builder.build(); + if (compressionType == CompressionType.NONE) { + assertEquals(1.0, builder.compressionRate(), 0.00001); + } else { + int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD_V0; + double computedCompressionRate = (double) compressedSize / uncompressedSize; + assertEquals(computedCompressionRate, builder.compressionRate(), 0.00001); + } + } + + @Test + public void testCompressionRateV1() { + ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.position(bufferOffset); + + Record[] records = new Record[] { + Record.create(Record.MAGIC_VALUE_V1, 0L, "a".getBytes(), "1".getBytes()), + Record.create(Record.MAGIC_VALUE_V1, 1L, "b".getBytes(), "2".getBytes()), + Record.create(Record.MAGIC_VALUE_V1, 2L, "c".getBytes(), "3".getBytes()), + }; + + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, + TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity()); + + int uncompressedSize = 0; + long offset = 0L; + for (Record record : records) { + uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD; + builder.append(offset++, record); + } + + MemoryRecords built = builder.build(); + if (compressionType == CompressionType.NONE) { + assertEquals(1.0, builder.compressionRate(), 0.00001); + } else { + int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD_V1; + double computedCompressionRate = (double) compressedSize / uncompressedSize; + assertEquals(computedCompressionRate, builder.compressionRate(), 0.00001); + } + } + + @Test + public void buildUsingLogAppendTime() { + ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.position(bufferOffset); + + long logAppendTime = System.currentTimeMillis(); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, + TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, buffer.capacity()); + builder.append(0L, 0L, "a".getBytes(), "1".getBytes()); + builder.append(1L, 0L, "b".getBytes(), "2".getBytes()); + builder.append(2L, 0L, "c".getBytes(), "3".getBytes()); + MemoryRecords records = builder.build(); + + MemoryRecordsBuilder.RecordsInfo info = builder.info(); + assertEquals(logAppendTime, info.maxTimestamp); + + assertEquals(2L, info.shallowOffsetOfMaxTimestamp); + + Iterator<Record> iterator = records.records(); + while (iterator.hasNext()) { + Record record = iterator.next(); + assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType()); + assertEquals(logAppendTime, record.timestamp()); + } + } + + @Test + public void convertUsingLogAppendTime() { + ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.position(bufferOffset); + + long logAppendTime = System.currentTimeMillis(); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, + TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, buffer.capacity()); + + builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes())); + builder.convertAndAppend(1L, Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes())); + builder.convertAndAppend(2L, Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes())); + MemoryRecords records = builder.build(); + + MemoryRecordsBuilder.RecordsInfo info = builder.info(); + assertEquals(logAppendTime, info.maxTimestamp); + + assertEquals(2L, info.shallowOffsetOfMaxTimestamp); + + Iterator<Record> iterator = records.records(); + while (iterator.hasNext()) { + Record record = iterator.next(); + assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType()); + assertEquals(logAppendTime, record.timestamp()); + } + } + + @Test + public void buildUsingCreateTime() { + ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.position(bufferOffset); + + long logAppendTime = System.currentTimeMillis(); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, + TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity()); + builder.append(0L, 0L, "a".getBytes(), "1".getBytes()); + builder.append(1L, 2L, "b".getBytes(), "2".getBytes()); + builder.append(2L, 1L, "c".getBytes(), "3".getBytes()); + MemoryRecords records = builder.build(); + + MemoryRecordsBuilder.RecordsInfo info = builder.info(); + assertEquals(2L, info.maxTimestamp); + + if (compressionType == CompressionType.NONE) + assertEquals(1L, info.shallowOffsetOfMaxTimestamp); + else + assertEquals(2L, info.shallowOffsetOfMaxTimestamp); + + Iterator<Record> iterator = records.records(); + int i = 0; + long[] expectedTimestamps = new long[] {0L, 2L, 1L}; + while (iterator.hasNext()) { + Record record = iterator.next(); + assertEquals(TimestampType.CREATE_TIME, record.timestampType()); + assertEquals(expectedTimestamps[i++], record.timestamp()); + } + } + + @Test + public void writePastLimit() { + ByteBuffer buffer = ByteBuffer.allocate(64); + buffer.position(bufferOffset); + + long logAppendTime = System.currentTimeMillis(); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, + TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity()); + builder.append(0L, 0L, "a".getBytes(), "1".getBytes()); + builder.append(1L, 1L, "b".getBytes(), "2".getBytes()); + + assertFalse(builder.hasRoomFor("c".getBytes(), "3".getBytes())); + builder.append(2L, 2L, "c".getBytes(), "3".getBytes()); + MemoryRecords records = builder.build(); + + MemoryRecordsBuilder.RecordsInfo info = builder.info(); + assertEquals(2L, info.maxTimestamp); + assertEquals(2L, info.shallowOffsetOfMaxTimestamp); + + Iterator<Record> iterator = records.records(); + long i = 0L; + while (iterator.hasNext()) { + Record record = iterator.next(); + assertEquals(TimestampType.CREATE_TIME, record.timestampType()); + assertEquals(i++, record.timestamp()); + } + } + + @Test + public void convertUsingCreateTime() { + ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.position(bufferOffset); + + long logAppendTime = System.currentTimeMillis(); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, + TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity()); + + builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes())); + builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes())); + builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes())); + MemoryRecords records = builder.build(); + + MemoryRecordsBuilder.RecordsInfo info = builder.info(); + assertEquals(Record.NO_TIMESTAMP, info.maxTimestamp); + assertEquals(0L, info.shallowOffsetOfMaxTimestamp); + + Iterator<Record> iterator = records.records(); + while (iterator.hasNext()) { + Record record = iterator.next(); + assertEquals(TimestampType.CREATE_TIME, record.timestampType()); + assertEquals(Record.NO_TIMESTAMP, record.timestamp()); + } + } + + @Parameterized.Parameters + public static Collection<Object[]> data() { + List<Object[]> values = new ArrayList<>(); + for (int bufferOffset : Arrays.asList(0, 15)) + for (CompressionType compressionType : CompressionType.values()) + values.add(new Object[] {bufferOffset, compressionType}); + return values; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index b1117f1..ef0fbeb 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -16,53 +16,64 @@ */ package org.apache.kafka.common.record; -import static org.apache.kafka.common.utils.Utils.toArray; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import org.apache.kafka.test.TestUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.toNullableArray; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; @RunWith(value = Parameterized.class) public class MemoryRecordsTest { private CompressionType compression; + private byte magic; + private long firstOffset; - public MemoryRecordsTest(CompressionType compression) { + public MemoryRecordsTest(byte magic, long firstOffset, CompressionType compression) { + this.magic = magic; this.compression = compression; + this.firstOffset = firstOffset; } @Test public void testIterator() { - MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression); - MemoryRecords recs2 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression); - List<Record> list = Arrays.asList(new Record(0L, "a".getBytes(), "1".getBytes()), - new Record(0L, "b".getBytes(), "2".getBytes()), - new Record(0L, "c".getBytes(), "3".getBytes())); + MemoryRecordsBuilder builder1 = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME, firstOffset); + MemoryRecordsBuilder builder2 = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME, firstOffset); + List<Record> list = asList( + Record.create(magic, 1L, "a".getBytes(), "1".getBytes()), + Record.create(magic, 2L, "b".getBytes(), "2".getBytes()), + Record.create(magic, 3L, "c".getBytes(), "3".getBytes()), + Record.create(magic, 4L, null, "4".getBytes()), + Record.create(magic, 5L, "e".getBytes(), null), + Record.create(magic, 6L, null, null)); + for (int i = 0; i < list.size(); i++) { Record r = list.get(i); - recs1.append(i, r); - recs2.append(i, 0L, toArray(r.key()), toArray(r.value())); + builder1.append(firstOffset + i, r); + builder2.append(firstOffset + i, i + 1, toNullableArray(r.key()), toNullableArray(r.value())); } - recs1.close(); - recs2.close(); + + MemoryRecords recs1 = builder1.build(); + MemoryRecords recs2 = builder2.build(); for (int iteration = 0; iteration < 2; iteration++) { - for (MemoryRecords recs : Arrays.asList(recs1, recs2)) { - Iterator<LogEntry> iter = recs.iterator(); + for (MemoryRecords recs : asList(recs1, recs2)) { + Iterator<LogEntry> iter = recs.deepIterator(); for (int i = 0; i < list.size(); i++) { assertTrue(iter.hasNext()); LogEntry entry = iter.next(); - assertEquals((long) i, entry.offset()); + assertEquals(firstOffset + i, entry.offset()); assertEquals(list.get(i), entry.record()); entry.record().ensureValid(); } @@ -73,20 +84,145 @@ public class MemoryRecordsTest { @Test public void testHasRoomForMethod() { - MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression); - recs1.append(0, new Record(0L, "a".getBytes(), "1".getBytes())); + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME); + builder.append(0, Record.create(magic, 0L, "a".getBytes(), "1".getBytes())); + + assertTrue(builder.hasRoomFor("b".getBytes(), "2".getBytes())); + builder.close(); + assertFalse(builder.hasRoomFor("b".getBytes(), "2".getBytes())); + } + + @Test + public void testFilterTo() { + ByteBuffer buffer = ByteBuffer.allocate(2048); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME); + builder.append(0L, 10L, null, "a".getBytes()); + builder.close(); + + builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 1L); + builder.append(1L, 11L, "1".getBytes(), "b".getBytes()); + builder.append(2L, 12L, null, "c".getBytes()); + builder.close(); - assertTrue(recs1.hasRoomFor("b".getBytes(), "2".getBytes())); - recs1.close(); - assertFalse(recs1.hasRoomFor("b".getBytes(), "2".getBytes())); + builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L); + builder.append(3L, 13L, null, "d".getBytes()); + builder.append(4L, 20L, "4".getBytes(), "e".getBytes()); + builder.append(5L, 15L, "5".getBytes(), "f".getBytes()); + builder.close(); + + builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 6L); + builder.append(6L, 16L, "6".getBytes(), "g".getBytes()); + builder.close(); + + buffer.flip(); + + ByteBuffer filtered = ByteBuffer.allocate(2048); + MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered); + + filtered.flip(); + + assertEquals(7, result.messagesRead); + assertEquals(4, result.messagesRetained); + assertEquals(buffer.limit(), result.bytesRead); + assertEquals(filtered.limit(), result.bytesRetained); + if (magic > 0) { + assertEquals(20L, result.maxTimestamp); + if (compression == CompressionType.NONE) + assertEquals(4L, result.shallowOffsetOfMaxTimestamp); + else + assertEquals(5L, result.shallowOffsetOfMaxTimestamp); + } + MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); + + List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowIterator()); + List<Long> expectedOffsets = compression == CompressionType.NONE ? asList(1L, 4L, 5L, 6L) : asList(1L, 5L, 6L); + assertEquals(expectedOffsets.size(), shallowEntries.size()); + + for (int i = 0; i < expectedOffsets.size(); i++) { + LogEntry shallowEntry = shallowEntries.get(i); + assertEquals(expectedOffsets.get(i).longValue(), shallowEntry.offset()); + assertEquals(magic, shallowEntry.record().magic()); + assertEquals(compression, shallowEntry.record().compressionType()); + assertEquals(magic == Record.MAGIC_VALUE_V0 ? TimestampType.NO_TIMESTAMP_TYPE : TimestampType.CREATE_TIME, + shallowEntry.record().timestampType()); + } + + List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepIterator()); + assertEquals(4, deepEntries.size()); + + LogEntry first = deepEntries.get(0); + assertEquals(1L, first.offset()); + assertEquals(Record.create(magic, 11L, "1".getBytes(), "b".getBytes()), first.record()); + + LogEntry second = deepEntries.get(1); + assertEquals(4L, second.offset()); + assertEquals(Record.create(magic, 20L, "4".getBytes(), "e".getBytes()), second.record()); + + LogEntry third = deepEntries.get(2); + assertEquals(5L, third.offset()); + assertEquals(Record.create(magic, 15L, "5".getBytes(), "f".getBytes()), third.record()); + + LogEntry fourth = deepEntries.get(3); + assertEquals(6L, fourth.offset()); + assertEquals(Record.create(magic, 16L, "6".getBytes(), "g".getBytes()), fourth.record()); + } + + @Test + public void testFilterToPreservesLogAppendTime() { + long logAppendTime = System.currentTimeMillis(); + + ByteBuffer buffer = ByteBuffer.allocate(2048); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, + TimestampType.LOG_APPEND_TIME, 0L, logAppendTime); + builder.append(0L, 10L, null, "a".getBytes()); + builder.close(); + + builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 1L, logAppendTime); + builder.append(1L, 11L, "1".getBytes(), "b".getBytes()); + builder.append(2L, 12L, null, "c".getBytes()); + builder.close(); + + builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 3L, logAppendTime); + builder.append(3L, 13L, null, "d".getBytes()); + builder.append(4L, 14L, "4".getBytes(), "e".getBytes()); + builder.append(5L, 15L, "5".getBytes(), "f".getBytes()); + builder.close(); + + buffer.flip(); + + ByteBuffer filtered = ByteBuffer.allocate(2048); + MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered); + + filtered.flip(); + MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); + + List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowIterator()); + assertEquals(compression == CompressionType.NONE ? 3 : 2, shallowEntries.size()); + + for (LogEntry shallowEntry : shallowEntries) { + assertEquals(compression, shallowEntry.record().compressionType()); + if (magic > Record.MAGIC_VALUE_V0) { + assertEquals(TimestampType.LOG_APPEND_TIME, shallowEntry.record().timestampType()); + assertEquals(logAppendTime, shallowEntry.record().timestamp()); + } + } } @Parameterized.Parameters public static Collection<Object[]> data() { - List<Object[]> values = new ArrayList<Object[]>(); - for (CompressionType type: CompressionType.values()) - values.add(new Object[] {type}); + List<Object[]> values = new ArrayList<>(); + for (long firstOffset : asList(0L, 57L)) + for (byte magic : asList(Record.MAGIC_VALUE_V0, Record.MAGIC_VALUE_V1)) + for (CompressionType type: CompressionType.values()) + values.add(new Object[] {magic, firstOffset, type}); return values; } + + private static class RetainNonNullKeysFilter implements MemoryRecords.LogEntryFilter { + @Override + public boolean shouldRetain(LogEntry entry) { + return entry.record().hasKey(); + } + } }
