Updated Branches: refs/heads/trunk c9052c5ff -> 253f86e31
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/network/ByteBufferReceive.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/kafka/common/network/ByteBufferReceive.java b/clients/src/main/java/kafka/common/network/ByteBufferReceive.java index cb1aaae..65a7c64 100644 --- a/clients/src/main/java/kafka/common/network/ByteBufferReceive.java +++ b/clients/src/main/java/kafka/common/network/ByteBufferReceive.java @@ -33,7 +33,9 @@ public class ByteBufferReceive implements Receive { @Override public long readFrom(ScatteringByteChannel channel) throws IOException { - return channel.read(buffers); + long read = channel.read(buffers); + remaining += read; + return read; } public ByteBuffer[] reify() { http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/kafka/common/protocol/Errors.java b/clients/src/main/java/kafka/common/protocol/Errors.java index fb1a3e5..402a6c0 100644 --- a/clients/src/main/java/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/kafka/common/protocol/Errors.java @@ -4,9 +4,9 @@ import java.util.HashMap; import java.util.Map; import kafka.common.errors.ApiException; -import kafka.common.errors.CorruptMessageException; +import kafka.common.errors.CorruptRecordException; import kafka.common.errors.LeaderNotAvailableException; -import kafka.common.errors.MessageTooLargeException; +import kafka.common.errors.RecordTooLargeException; import kafka.common.errors.NetworkException; import kafka.common.errors.NotLeaderForPartitionException; import kafka.common.errors.OffsetMetadataTooLarge; @@ -27,14 +27,14 @@ public enum Errors { OFFSET_OUT_OF_RANGE(1, new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")), CORRUPT_MESSAGE(2, - new CorruptMessageException("The message contents does not match the message CRC or the message is otherwise corrupt.")), + new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")), UNKNOWN_TOPIC_OR_PARTITION(3, new UnknownTopicOrPartitionException("This server does not host this topic-partition.")), LEADER_NOT_AVAILABLE(5, new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")), NOT_LEADER_FOR_PARTITION(6, new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")), REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")), MESSAGE_TOO_LARGE(10, - new MessageTooLargeException("The request included a message larger than the max message size the server will accept.")), + new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")), OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")), NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")); http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/protocol/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/kafka/common/protocol/ProtoUtils.java index 83dad53..576c24d 100644 --- a/clients/src/main/java/kafka/common/protocol/ProtoUtils.java +++ b/clients/src/main/java/kafka/common/protocol/ProtoUtils.java @@ -2,7 +2,9 @@ package kafka.common.protocol; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import kafka.common.Cluster; import kafka.common.Node; @@ -52,14 +54,14 @@ public class ProtoUtils { } public static Cluster parseMetadataResponse(Struct response) { - List<Node> brokers = new ArrayList<Node>(); + Map<Integer, Node> brokers = new HashMap<Integer, Node>(); Object[] brokerStructs = (Object[]) response.get("brokers"); for (int i = 0; i < brokerStructs.length; i++) { Struct broker = (Struct) brokerStructs[i]; int nodeId = (Integer) broker.get("node_id"); String host = (String) broker.get("host"); int port = (Integer) broker.get("port"); - brokers.add(new Node(nodeId, host, port)); + brokers.put(nodeId, new Node(nodeId, host, port)); } List<PartitionInfo> partitions = new ArrayList<PartitionInfo>(); Object[] topicInfos = (Object[]) response.get("topic_metadata"); @@ -75,21 +77,21 @@ public class ProtoUtils { if (partError == Errors.NONE.code()) { int partition = partitionInfo.getInt("partition_id"); int leader = partitionInfo.getInt("leader"); - int[] replicas = intArray((Object[]) partitionInfo.get("replicas")); - int[] isr = intArray((Object[]) partitionInfo.get("isr")); - partitions.add(new PartitionInfo(topic, partition, leader, replicas, isr)); + Node leaderNode = leader == -1 ? null : brokers.get(leader); + Object[] replicas = (Object[]) partitionInfo.get("replicas"); + Node[] replicaNodes = new Node[replicas.length]; + for (int k = 0; k < replicas.length; k++) + replicaNodes[k] = brokers.get(replicas[k]); + Object[] isr = (Object[]) partitionInfo.get("isr"); + Node[] isrNodes = new Node[isr.length]; + for (int k = 0; k < isr.length; k++) + isrNodes[k] = brokers.get(isr[k]); + partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes)); } } } } - return new Cluster(brokers, partitions); - } - - private static int[] intArray(Object[] ints) { - int[] copy = new int[ints.length]; - for (int i = 0; i < ints.length; i++) - copy[i] = (Integer) ints[i]; - return copy; + return new Cluster(brokers.values(), partitions); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/kafka/common/protocol/Protocol.java b/clients/src/main/java/kafka/common/protocol/Protocol.java index e191d6a..49b60aa 100644 --- a/clients/src/main/java/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/kafka/common/protocol/Protocol.java @@ -66,7 +66,7 @@ public class Protocol { public static Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING), new Field("data", new ArrayOf(new Schema(new Field("partition", INT32), - new Field("message_set", BYTES))))); + new Field("record_set", BYTES))))); public static Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks", INT16, http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/record/MemoryRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/kafka/common/record/MemoryRecords.java b/clients/src/main/java/kafka/common/record/MemoryRecords.java index ec98226..d3f8426 100644 --- a/clients/src/main/java/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/kafka/common/record/MemoryRecords.java @@ -48,7 +48,7 @@ public class MemoryRecords implements Records { return this.buffer.remaining() >= Records.LOG_OVERHEAD + Record.recordSize(key, value); } - /** Write the messages in this set to the given channel */ + /** Write the records in this set to the given channel */ public int writeTo(GatheringByteChannel channel) throws IOException { return channel.write(buffer); } @@ -89,7 +89,7 @@ public class MemoryRecords implements Records { long offset = buffer.getLong(); int size = buffer.getInt(); if (size < 0) - throw new IllegalStateException("Message with size " + size); + throw new IllegalStateException("Record with size " + size); if (buffer.remaining() < size) return allDone(); ByteBuffer rec = buffer.slice(); http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/record/Record.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/kafka/common/record/Record.java b/clients/src/main/java/kafka/common/record/Record.java index 835a0a4..b89accf 100644 --- a/clients/src/main/java/kafka/common/record/Record.java +++ b/clients/src/main/java/kafka/common/record/Record.java @@ -162,7 +162,7 @@ public final class Record { } /** - * Throw an InvalidMessageException if isValid is false for this record + * Throw an InvalidRecordException if isValid is false for this record */ public void ensureValid() { if (!isValid()) @@ -260,7 +260,7 @@ public final class Record { } public String toString() { - return String.format("Message(magic = %d, attributes = %d, crc = %d, key = %d bytes, value = %d bytes)", + return String.format("Record(magic = %d, attributes = %d, crc = %d, key = %d bytes, value = %d bytes)", magic(), attributes(), checksum(), http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/test/java/kafka/clients/producer/MetadataTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/kafka/clients/producer/MetadataTest.java index 68e4bd7..dd45209 100644 --- a/clients/src/test/java/kafka/clients/producer/MetadataTest.java +++ b/clients/src/test/java/kafka/clients/producer/MetadataTest.java @@ -1,12 +1,10 @@ package kafka.clients.producer; -import static java.util.Arrays.asList; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import kafka.clients.producer.internals.Metadata; import kafka.common.Cluster; -import kafka.common.Node; -import kafka.common.PartitionInfo; +import kafka.test.TestUtils; import org.junit.Test; @@ -30,7 +28,7 @@ public class MetadataTest { Thread t2 = asyncFetch(topic); assertTrue("Awaiting update", t1.isAlive()); assertTrue("Awaiting update", t2.isAlive()); - metadata.update(clusterWith(topic), time); + metadata.update(TestUtils.singletonCluster(topic, 1), time); t1.join(); t2.join(); assertFalse("No update needed.", metadata.needsUpdate(time)); @@ -38,10 +36,6 @@ public class MetadataTest { assertTrue("Update needed due to stale metadata.", metadata.needsUpdate(time)); } - private Cluster clusterWith(String topic) { - return new Cluster(asList(new Node(0, "localhost", 1969)), asList(new PartitionInfo(topic, 0, 0, new int[0], new int[0]))); - } - private Thread asyncFetch(final String topic) { Thread thread = new Thread() { public void run() { http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/test/java/kafka/clients/producer/MockProducerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/kafka/clients/producer/MockProducerTest.java index 61929a4..24b132f 100644 --- a/clients/src/test/java/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/kafka/clients/producer/MockProducerTest.java @@ -5,62 +5,59 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import kafka.common.Cluster; -import kafka.common.Node; -import kafka.common.PartitionInfo; -import kafka.common.Serializer; -import kafka.common.StringSerialization; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.junit.Test; public class MockProducerTest { + private String topic = "topic"; + @Test - public void testAutoCompleteMock() { + public void testAutoCompleteMock() throws Exception { MockProducer producer = new MockProducer(true); - ProducerRecord record = new ProducerRecord("topic", "key", "value"); - RecordSend send = producer.send(record); - assertTrue("Send should be immediately complete", send.completed()); - assertFalse("Send should be successful", send.hasError()); - assertEquals("Offset should be 0", 0, send.offset()); + ProducerRecord record = new ProducerRecord(topic, "key".getBytes(), "value".getBytes()); + Future<RecordMetadata> metadata = producer.send(record); + assertTrue("Send should be immediately complete", metadata.isDone()); + assertFalse("Send should be successful", isError(metadata)); + assertEquals("Offset should be 0", 0, metadata.get().offset()); + assertEquals(topic, metadata.get().topic()); assertEquals("We should have the record in our history", asList(record), producer.history()); producer.clear(); assertEquals("Clear should erase our history", 0, producer.history().size()); } - public void testManualCompletion() { + @Test + public void testManualCompletion() throws Exception { MockProducer producer = new MockProducer(false); - ProducerRecord record1 = new ProducerRecord("topic", "key1", "value1"); - ProducerRecord record2 = new ProducerRecord("topic", "key2", "value2"); - RecordSend send1 = producer.send(record1); - assertFalse("Send shouldn't have completed", send1.completed()); - RecordSend send2 = producer.send(record2); - assertFalse("Send shouldn't have completed", send2.completed()); + ProducerRecord record1 = new ProducerRecord("topic", "key1".getBytes(), "value1".getBytes()); + ProducerRecord record2 = new ProducerRecord("topic", "key2".getBytes(), "value2".getBytes()); + Future<RecordMetadata> md1 = producer.send(record1); + assertFalse("Send shouldn't have completed", md1.isDone()); + Future<RecordMetadata> md2 = producer.send(record2); + assertFalse("Send shouldn't have completed", md2.isDone()); assertTrue("Complete the first request", producer.completeNext()); - assertFalse("Requst should be successful", send1.hasError()); - assertFalse("Second request still incomplete", send2.completed()); + assertFalse("Requst should be successful", isError(md1)); + assertFalse("Second request still incomplete", md2.isDone()); IllegalArgumentException e = new IllegalArgumentException("blah"); assertTrue("Complete the second request with an error", producer.errorNext(e)); try { - send2.await(); + md2.get(); fail("Expected error to be thrown"); - } catch (IllegalArgumentException err) { - // this is good + } catch (ExecutionException err) { + assertEquals(e, err.getCause()); } assertFalse("No more requests to complete", producer.completeNext()); } - public void testSerializationAndPartitioning() { - Cluster cluster = new Cluster(asList(new Node(0, "host", -1)), asList(new PartitionInfo("topic", - 0, - 0, - new int[] { 0 }, - new int[] { 0 }))); - Serializer serializer = new StringSerialization(); - Partitioner partitioner = new DefaultPartitioner(); - MockProducer producer = new MockProducer(serializer, serializer, partitioner, cluster, true); - ProducerRecord record = new ProducerRecord("topic", "key", "value"); - RecordSend send = producer.send(record); - assertTrue("Send should be immediately complete", send.completed()); + private boolean isError(Future<?> future) { + try { + future.get(); + return false; + } catch (Exception e) { + return true; + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/test/java/kafka/clients/producer/PartitionerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/kafka/clients/producer/PartitionerTest.java new file mode 100644 index 0000000..c18da76 --- /dev/null +++ b/clients/src/test/java/kafka/clients/producer/PartitionerTest.java @@ -0,0 +1,54 @@ +package kafka.clients.producer; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import kafka.clients.producer.internals.Partitioner; +import kafka.common.Cluster; +import kafka.common.Node; +import kafka.common.PartitionInfo; + +import org.junit.Test; + +public class PartitionerTest { + + private byte[] key = "key".getBytes(); + private byte[] value = "value".getBytes(); + private Partitioner partitioner = new Partitioner(); + private Node node0 = new Node(0, "localhost", 99); + private Node node1 = new Node(1, "localhost", 100); + private Node node2 = new Node(2, "localhost", 101); + private Node[] nodes = new Node[] { node0, node1, node2 }; + private String topic = "test"; + private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 0, node0, nodes, nodes), + new PartitionInfo(topic, 1, node1, nodes, nodes), + new PartitionInfo(topic, 2, null, nodes, nodes)); + private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions); + + @Test + public void testUserSuppliedPartitioning() { + assertEquals("If the user supplies a partition we should use it.", + 0, + partitioner.partition(new ProducerRecord("test", 0, key, value), cluster)); + } + + @Test + public void testKeyPartitionIsStable() { + int partition = partitioner.partition(new ProducerRecord("test", key, value), cluster); + assertEquals("Same key should yield same partition", + partition, + partitioner.partition(new ProducerRecord("test", key, "value2".getBytes()), cluster)); + } + + @Test + public void testRoundRobinWithDownNode() { + for (int i = 0; i < partitions.size(); i++) { + int part = partitioner.partition(new ProducerRecord("test", value), cluster); + assertTrue("We should never choose a leader-less node in round robin", part >= 0 && part < 2); + + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/test/java/kafka/clients/producer/RecordSendTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/kafka/clients/producer/RecordSendTest.java index f8fd14b..804c57b 100644 --- a/clients/src/test/java/kafka/clients/producer/RecordSendTest.java +++ b/clients/src/test/java/kafka/clients/producer/RecordSendTest.java @@ -5,12 +5,14 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import kafka.clients.producer.internals.FutureRecordMetadata; import kafka.clients.producer.internals.ProduceRequestResult; import kafka.common.TopicPartition; -import kafka.common.errors.CorruptMessageException; -import kafka.common.errors.TimeoutException; +import kafka.common.errors.CorruptRecordException; import org.junit.Test; @@ -24,37 +26,37 @@ public class RecordSendTest { * Test that waiting on a request that never completes times out */ @Test - public void testTimeout() { + public void testTimeout() throws Exception { ProduceRequestResult request = new ProduceRequestResult(); - RecordSend send = new RecordSend(relOffset, request); - assertFalse("Request is not completed", send.completed()); + FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset); + assertFalse("Request is not completed", future.isDone()); try { - send.await(5, TimeUnit.MILLISECONDS); + future.get(5, TimeUnit.MILLISECONDS); fail("Should have thrown exception."); } catch (TimeoutException e) { /* this is good */ } request.done(topicPartition, baseOffset, null); - assertTrue(send.completed()); - assertEquals(baseOffset + relOffset, send.offset()); + assertTrue(future.isDone()); + assertEquals(baseOffset + relOffset, future.get().offset()); } /** * Test that an asynchronous request will eventually throw the right exception */ - @Test(expected = CorruptMessageException.class) - public void testError() { - RecordSend send = new RecordSend(relOffset, asyncRequest(baseOffset, new CorruptMessageException(), 50L)); - send.await(); + @Test(expected = ExecutionException.class) + public void testError() throws Exception { + FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L), relOffset); + future.get(); } /** * Test that an asynchronous request will eventually return the right offset */ @Test - public void testBlocking() { - RecordSend send = new RecordSend(relOffset, asyncRequest(baseOffset, null, 50L)); - assertEquals(baseOffset + relOffset, send.offset()); + public void testBlocking() throws Exception { + FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L), relOffset); + assertEquals(baseOffset + relOffset, future.get().offset()); } /* create a new request result that will be completed after the given timeout */ http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/test/java/kafka/clients/producer/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/kafka/clients/producer/SenderTest.java b/clients/src/test/java/kafka/clients/producer/SenderTest.java index 73f1aba..8788095 100644 --- a/clients/src/test/java/kafka/clients/producer/SenderTest.java +++ b/clients/src/test/java/kafka/clients/producer/SenderTest.java @@ -1,17 +1,15 @@ package kafka.clients.producer; -import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.nio.ByteBuffer; +import java.util.concurrent.Future; import kafka.clients.producer.internals.Metadata; import kafka.clients.producer.internals.RecordAccumulator; import kafka.clients.producer.internals.Sender; import kafka.common.Cluster; -import kafka.common.Node; -import kafka.common.PartitionInfo; import kafka.common.TopicPartition; import kafka.common.metrics.Metrics; import kafka.common.network.NetworkReceive; @@ -24,6 +22,7 @@ import kafka.common.requests.RequestSend; import kafka.common.requests.ResponseHeader; import kafka.common.utils.MockTime; import kafka.test.MockSelector; +import kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; @@ -34,11 +33,7 @@ public class SenderTest { private MockSelector selector = new MockSelector(time); private int batchSize = 16 * 1024; private Metadata metadata = new Metadata(0, Long.MAX_VALUE); - private Cluster cluster = new Cluster(asList(new Node(0, "localhost", 1969)), asList(new PartitionInfo("test", - 0, - 0, - new int[0], - new int[0]))); + private Cluster cluster = TestUtils.singletonCluster("test", 1); private Metrics metrics = new Metrics(time); private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, false, metrics, time); private Sender sender = new Sender(selector, metadata, this.accumulator, "", 1024 * 1024, 0L, (short) -1, 10000, time); @@ -51,7 +46,7 @@ public class SenderTest { @Test public void testSimple() throws Exception { TopicPartition tp = new TopicPartition("test", 0); - RecordSend send = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); sender.run(time.milliseconds()); assertEquals("We should have connected", 1, selector.connected().size()); selector.clear(); @@ -67,8 +62,8 @@ public class SenderTest { offset, Errors.NONE.code())); sender.run(time.milliseconds()); - assertTrue("Request should be completed", send.completed()); - assertEquals(offset, send.offset()); + assertTrue("Request should be completed", future.isDone()); + assertEquals(offset, future.get().offset()); } private NetworkReceive produceResponse(int correlation, int source, String topic, int part, long offset, int error) { http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/test/java/kafka/test/TestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/kafka/test/TestUtils.java b/clients/src/test/java/kafka/test/TestUtils.java index a2ef3a2..90c6850 100644 --- a/clients/src/test/java/kafka/test/TestUtils.java +++ b/clients/src/test/java/kafka/test/TestUtils.java @@ -1,10 +1,18 @@ package kafka.test; +import static java.util.Arrays.asList; + import java.io.File; import java.io.IOException; import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.List; import java.util.Random; +import kafka.common.Cluster; +import kafka.common.Node; +import kafka.common.PartitionInfo; + /** * Helper functions for writing unit tests */ @@ -20,6 +28,20 @@ public class TestUtils { public static final Random seededRandom = new Random(192348092834L); public static final Random random = new Random(); + public static Cluster singletonCluster(String topic, int partitions) { + return clusterWith(1, topic, partitions); + } + + public static Cluster clusterWith(int nodes, String topic, int partitions) { + Node[] ns = new Node[nodes]; + for (int i = 0; i < nodes; i++) + ns[i] = new Node(0, "localhost", 1969); + List<PartitionInfo> parts = new ArrayList<PartitionInfo>(); + for (int i = 0; i < partitions; i++) + parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns)); + return new Cluster(asList(ns), parts); + } + /** * Choose a number of random available ports */
