Repository: kafka Updated Branches: refs/heads/trunk 2270a7537 -> b6d326b08
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 56281ee..695eaf6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -19,7 +19,6 @@ import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.RequestCompletionHandler; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; @@ -61,9 +60,6 @@ import java.util.Map; public class Fetcher<K, V> { private static final Logger log = LoggerFactory.getLogger(Fetcher.class); - private static final long EARLIEST_OFFSET_TIMESTAMP = -2L; - private static final long LATEST_OFFSET_TIMESTAMP = -1L; - private final KafkaClient client; @@ -72,23 +68,19 @@ public class Fetcher<K, V> { private final int maxWaitMs; private final int fetchSize; private final boolean checkCrcs; - private final long retryBackoffMs; private final Metadata metadata; private final FetchManagerMetrics sensors; private final SubscriptionState subscriptions; private final List<PartitionRecords<K, V>> records; - private final AutoOffsetResetStrategy offsetResetStrategy; private final Deserializer<K> keyDeserializer; private final Deserializer<V> valueDeserializer; public Fetcher(KafkaClient client, - long retryBackoffMs, int minBytes, int maxWaitMs, int fetchSize, boolean checkCrcs, - String offsetReset, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Metadata metadata, @@ -102,17 +94,16 @@ public class Fetcher<K, V> { this.client = client; this.metadata = metadata; this.subscriptions = subscriptions; - this.retryBackoffMs = retryBackoffMs; this.minBytes = minBytes; this.maxWaitMs = maxWaitMs; this.fetchSize = fetchSize; this.checkCrcs = checkCrcs; - this.offsetResetStrategy = AutoOffsetResetStrategy.valueOf(offsetReset); this.keyDeserializer = keyDeserializer; this.valueDeserializer = valueDeserializer; this.records = new LinkedList<PartitionRecords<K, V>>(); + this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags); } @@ -166,84 +157,76 @@ public class Fetcher<K, V> { } /** - * Reset offsets for the given partition using the offset reset strategy. - * - * @param partition The given partition that needs reset offset - * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined - */ - public void resetOffset(TopicPartition partition) { - long timestamp; - if (this.offsetResetStrategy == AutoOffsetResetStrategy.EARLIEST) - timestamp = EARLIEST_OFFSET_TIMESTAMP; - else if (this.offsetResetStrategy == AutoOffsetResetStrategy.LATEST) - timestamp = LATEST_OFFSET_TIMESTAMP; - else - throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined"); - - log.debug("Resetting offset for partition {} to {} offset.", partition, this.offsetResetStrategy.name() - .toLowerCase()); - this.subscriptions.seek(partition, offsetBefore(partition, timestamp)); - } - - /** * Fetch a single offset before the given timestamp for the partition. * * @param topicPartition The partition that needs fetching offset. * @param timestamp The timestamp for fetching offset. - * @return The offset of the message that is published before the given timestamp + * @return A response which can be polled to obtain the corresponding offset. */ - public long offsetBefore(TopicPartition topicPartition, long timestamp) { - log.debug("Fetching offsets for partition {}.", topicPartition); + public RequestFuture<Long> listOffset(final TopicPartition topicPartition, long timestamp) { Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>(1); partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1)); - while (true) { - long now = time.milliseconds(); - PartitionInfo info = metadata.fetch().partition(topicPartition); - if (info == null) { - metadata.add(topicPartition.topic()); - log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition); - awaitMetadataUpdate(); - } else if (info.leader() == null) { - log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition); - awaitMetadataUpdate(); - } else if (this.client.ready(info.leader(), now)) { - Node node = info.leader(); - ListOffsetRequest request = new ListOffsetRequest(-1, partitions); - RequestSend send = new RequestSend(node.idString(), + long now = time.milliseconds(); + PartitionInfo info = metadata.fetch().partition(topicPartition); + if (info == null) { + metadata.add(topicPartition.topic()); + log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition); + return RequestFuture.metadataRefreshNeeded(); + } else if (info.leader() == null) { + log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition); + return RequestFuture.metadataRefreshNeeded(); + } else if (this.client.ready(info.leader(), now)) { + final RequestFuture<Long> future = new RequestFuture<Long>(); + Node node = info.leader(); + ListOffsetRequest request = new ListOffsetRequest(-1, partitions); + RequestSend send = new RequestSend(node.idString(), this.client.nextRequestHeader(ApiKeys.LIST_OFFSETS), request.toStruct()); - ClientRequest clientRequest = new ClientRequest(now, true, send, null); - this.client.send(clientRequest); - List<ClientResponse> responses = this.client.completeAll(node.idString(), now); - if (responses.isEmpty()) - throw new IllegalStateException("This should not happen."); - ClientResponse response = responses.get(responses.size() - 1); - if (response.wasDisconnected()) { - awaitMetadataUpdate(); - } else { - ListOffsetResponse lor = new ListOffsetResponse(response.responseBody()); - short errorCode = lor.responseData().get(topicPartition).errorCode; - if (errorCode == Errors.NONE.code()) { - List<Long> offsets = lor.responseData().get(topicPartition).offsets; - if (offsets.size() != 1) - throw new IllegalStateException("This should not happen."); - long offset = offsets.get(0); - log.debug("Fetched offset {} for partition {}", offset, topicPartition); - return offset; - } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() - || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { - log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", - topicPartition); - awaitMetadataUpdate(); - } else { - log.error("Attempt to fetch offsets for partition {} failed due to: {}", - topicPartition, Errors.forCode(errorCode).exception().getMessage()); - awaitMetadataUpdate(); - } + RequestCompletionHandler completionHandler = new RequestCompletionHandler() { + @Override + public void onComplete(ClientResponse resp) { + handleListOffsetResponse(topicPartition, resp, future); } + }; + ClientRequest clientRequest = new ClientRequest(now, true, send, completionHandler); + this.client.send(clientRequest); + return future; + } else { + // We initiated a connect to the leader, but we need to poll to finish it. + return RequestFuture.pollNeeded(); + } + } + + /** + * Callback for the response of the list offset call above. + * @param topicPartition The partition that was fetched + * @param clientResponse The response from the server. + */ + private void handleListOffsetResponse(TopicPartition topicPartition, + ClientResponse clientResponse, + RequestFuture<Long> future) { + if (clientResponse.wasDisconnected()) { + future.retryAfterMetadataRefresh(); + } else { + ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody()); + short errorCode = lor.responseData().get(topicPartition).errorCode; + if (errorCode == Errors.NONE.code()) { + List<Long> offsets = lor.responseData().get(topicPartition).offsets; + if (offsets.size() != 1) + throw new IllegalStateException("This should not happen."); + long offset = offsets.get(0); + log.debug("Fetched offset {} for partition {}", offset, topicPartition); + + future.complete(offset); + } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() + || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { + log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", + topicPartition); + future.retryAfterMetadataRefresh(); } else { - log.debug("Leader for partition {} is not ready, retry fetching offsets", topicPartition); - client.poll(this.retryBackoffMs, now); + log.error("Attempt to fetch offsets for partition {} failed due to: {}", + topicPartition, Errors.forCode(errorCode).exception().getMessage()); + future.retryAfterMetadataRefresh(); } } } @@ -257,8 +240,10 @@ public class Fetcher<K, V> { Map<Integer, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<Integer, Map<TopicPartition, FetchRequest.PartitionData>>(); for (TopicPartition partition : subscriptions.assignedPartitions()) { Node node = cluster.leaderFor(partition); - // if there is a leader and no in-flight requests, issue a new fetch - if (node != null && this.client.inFlightRequestCount(node.idString()) == 0) { + if (node == null) { + metadata.requestUpdate(); + } else if (this.client.inFlightRequestCount(node.idString()) == 0) { + // if there is a leader and no in-flight requests, issue a new fetch Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node.id()); if (fetch == null) { fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>(); @@ -327,7 +312,7 @@ public class Fetcher<K, V> { } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) { // TODO: this could be optimized by grouping all out-of-range partitions log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp)); - resetOffset(tp); + subscriptions.needOffsetReset(tp); } else if (partition.errorCode == Errors.UNKNOWN.code()) { log.warn("Unknown error fetching data for topic-partition {}", tp); } else { @@ -356,17 +341,6 @@ public class Fetcher<K, V> { return new ConsumerRecord<K, V>(partition.topic(), partition.partition(), offset, key, value); } - /* - * Request a metadata update and wait until it has occurred - */ - private void awaitMetadataUpdate() { - int version = this.metadata.requestUpdate(); - do { - long now = time.milliseconds(); - this.client.poll(this.retryBackoffMs, now); - } while (this.metadata.version() == version); - } - private static class PartitionRecords<K, V> { public long fetchOffset; public TopicPartition partition; @@ -379,9 +353,6 @@ public class Fetcher<K, V> { } } - private static enum AutoOffsetResetStrategy { - LATEST, EARLIEST, NONE - } private class FetchManagerMetrics { public final Metrics metrics; http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java index e7cfaaa..51eae19 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java @@ -42,4 +42,14 @@ public final class Heartbeat { public long lastHeartbeatSend() { return this.lastHeartbeatSend; } + + public long timeToNextHeartbeat(long now) { + long timeSinceLastHeartbeat = now - lastHeartbeatSend; + + long hbInterval = timeout / HEARTBEATS_PER_SESSION_INTERVAL; + if (timeSinceLastHeartbeat > hbInterval) + return 0; + else + return hbInterval - timeSinceLastHeartbeat; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java new file mode 100644 index 0000000..13fc9af --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +/** + * Result of an asynchronous request through {@link org.apache.kafka.clients.KafkaClient}. To get the + * result of the request, you must use poll using {@link org.apache.kafka.clients.KafkaClient#poll(long, long)} + * until {@link #isDone()} returns true. Typical usage might look like this: + * + * <pre> + * RequestFuture future = sendRequest(); + * while (!future.isDone()) { + * client.poll(timeout, now); + * } + * + * switch (future.outcome()) { + * case SUCCESS: + * // handle request success + * break; + * case NEED_RETRY: + * // retry after taking possible retry action + * break; + * case EXCEPTION: + * // handle exception + * } + * </pre> + * + * When {@link #isDone()} returns true, there are three possible outcomes (obtained through {@link #outcome()}): + * + * <ol> + * <li> {@link org.apache.kafka.clients.consumer.internals.RequestFuture.Outcome#SUCCESS}: If the request was + * successful, then you can use {@link #value()} to obtain the result.</li> + * <li> {@link org.apache.kafka.clients.consumer.internals.RequestFuture.Outcome#EXCEPTION}: If an unhandled exception + * was encountered, you can use {@link #exception()} to get it.</li> + * <li> {@link org.apache.kafka.clients.consumer.internals.RequestFuture.Outcome#NEED_RETRY}: The request may + * not have been successful, but the failure may be ephemeral and the caller just needs to try the request again. + * In this case, use {@link #retryAction()} to determine what action should be taken (if any) before + * retrying.</li> + * </ol> + * + * @param <T> Return type of the result (Can be Void if there is no response) + */ +public class RequestFuture<T> { + public static final RequestFuture<Object> NEED_NEW_COORDINATOR = newRetryFuture(RetryAction.FIND_COORDINATOR); + public static final RequestFuture<Object> NEED_POLL = newRetryFuture(RetryAction.POLL); + public static final RequestFuture<Object> NEED_METADATA_REFRESH = newRetryFuture(RetryAction.REFRESH_METADATA); + + public enum RetryAction { + NOOP, // Retry immediately. + POLL, // Retry after calling poll (e.g. to finish a connection) + BACKOFF, // Retry after a delay + FIND_COORDINATOR, // Find a new coordinator before retrying + REFRESH_METADATA // Refresh metadata before retrying + } + + public enum Outcome { + SUCCESS, + NEED_RETRY, + EXCEPTION + } + + private Outcome outcome; + private RetryAction retryAction; + private T value; + private RuntimeException exception; + + /** + * Check whether the response is ready to be handled + * @return true if the response is ready, false otherwise + */ + public boolean isDone() { + return outcome != null; + } + + /** + * Get the value corresponding to this request (if it has one, as indicated by {@link #outcome()}). + * @return the value if it exists or null + */ + public T value() { + return value; + } + + /** + * Check if the request succeeded; + * @return true if a value is available, false otherwise + */ + public boolean succeeded() { + return outcome == Outcome.SUCCESS; + } + + /** + * Check if the request completed failed. + * @return true if the request failed (whether or not it can be retried) + */ + public boolean failed() { + return outcome != Outcome.SUCCESS; + } + + /** + * Return the error from this response (assuming {@link #succeeded()} has returned false. If the + * response is not ready or if there is no retryAction, null is returned. + * @return the error if it exists or null + */ + public RetryAction retryAction() { + return retryAction; + } + + /** + * Get the exception from a failed result. You should check that there is an exception + * with {@link #hasException()} before using this method. + * @return The exception if it exists or null + */ + public RuntimeException exception() { + return exception; + } + + /** + * Check whether there was an exception. + * @return true if this request failed with an exception + */ + public boolean hasException() { + return outcome == Outcome.EXCEPTION; + } + + /** + * Check the outcome of the future if it is ready. + * @return the outcome or null if the future is not finished + */ + public Outcome outcome() { + return outcome; + } + + /** + * The request failed, but should be retried using the provided retry action. + * @param retryAction The action that should be taken by the caller before retrying the request + */ + public void retry(RetryAction retryAction) { + this.outcome = Outcome.NEED_RETRY; + this.retryAction = retryAction; + } + + public void retryNow() { + retry(RetryAction.NOOP); + } + + public void retryAfterBackoff() { + retry(RetryAction.BACKOFF); + } + + public void retryWithNewCoordinator() { + retry(RetryAction.FIND_COORDINATOR); + } + + public void retryAfterMetadataRefresh() { + retry(RetryAction.REFRESH_METADATA); + } + + /** + * Complete the request successfully. After this call, {@link #succeeded()} will return true + * and the value can be obtained through {@link #value()}. + * @param value corresponding value (or null if there is none) + */ + public void complete(T value) { + this.outcome = Outcome.SUCCESS; + this.value = value; + } + + /** + * Raise an exception. The request will be marked as failed, and the caller can either + * handle the exception or throw it. + * @param e The exception that + */ + public void raise(RuntimeException e) { + this.outcome = Outcome.EXCEPTION; + this.exception = e; + } + + private static <T> RequestFuture<T> newRetryFuture(RetryAction retryAction) { + RequestFuture<T> result = new RequestFuture<T>(); + result.retry(retryAction); + return result; + } + + @SuppressWarnings("unchecked") + public static <T> RequestFuture<T> pollNeeded() { + return (RequestFuture<T>) NEED_POLL; + } + + @SuppressWarnings("unchecked") + public static <T> RequestFuture<T> metadataRefreshNeeded() { + return (RequestFuture<T>) NEED_METADATA_REFRESH; + } + + @SuppressWarnings("unchecked") + public static <T> RequestFuture<T> newCoordinatorNeeded() { + return (RequestFuture<T>) NEED_NEW_COORDINATOR; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index cee7541..6837453 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -12,14 +12,15 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; + import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.kafka.common.TopicPartition; - /** * A class for tracking the topics, partitions, and offsets for the consumer */ @@ -49,7 +50,14 @@ public class SubscriptionState { /* do we need to request the latest committed offsets from the coordinator? */ private boolean needsFetchCommittedOffsets; - public SubscriptionState() { + /* Partitions that need to be reset before fetching */ + private Map<TopicPartition, OffsetResetStrategy> resetPartitions; + + /* Default offset reset strategy */ + private OffsetResetStrategy offsetResetStrategy; + + public SubscriptionState(OffsetResetStrategy offsetResetStrategy) { + this.offsetResetStrategy = offsetResetStrategy; this.subscribedTopics = new HashSet<String>(); this.subscribedPartitions = new HashSet<TopicPartition>(); this.assignedPartitions = new HashSet<TopicPartition>(); @@ -58,6 +66,7 @@ public class SubscriptionState { this.committed = new HashMap<TopicPartition, Long>(); this.needsPartitionAssignment = false; this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up + this.resetPartitions = new HashMap<TopicPartition, OffsetResetStrategy>(); } public void subscribe(String topic) { @@ -102,12 +111,14 @@ public class SubscriptionState { this.committed.remove(tp); this.fetched.remove(tp); this.consumed.remove(tp); + this.resetPartitions.remove(tp); } public void clearAssignment() { this.assignedPartitions.clear(); this.committed.clear(); this.fetched.clear(); + this.consumed.clear(); this.needsPartitionAssignment = !subscribedTopics().isEmpty(); } @@ -145,6 +156,7 @@ public class SubscriptionState { public void seek(TopicPartition tp, long offset) { fetched(tp, offset); consumed(tp, offset); + resetPartitions.remove(tp); } public Set<TopicPartition> assignedPartitions() { @@ -169,6 +181,28 @@ public class SubscriptionState { return this.consumed; } + public void needOffsetReset(TopicPartition partition, OffsetResetStrategy offsetResetStrategy) { + this.resetPartitions.put(partition, offsetResetStrategy); + this.fetched.remove(partition); + this.consumed.remove(partition); + } + + public void needOffsetReset(TopicPartition partition) { + needOffsetReset(partition, offsetResetStrategy); + } + + public boolean isOffsetResetNeeded(TopicPartition partition) { + return resetPartitions.containsKey(partition); + } + + public boolean isOffsetResetNeeded() { + return !resetPartitions.isEmpty(); + } + + public OffsetResetStrategy resetStrategy(TopicPartition partition) { + return resetPartitions.get(partition); + } + public boolean hasAllFetchPositions() { return this.fetched.size() >= this.assignedPartitions.size(); } @@ -192,4 +226,5 @@ public class SubscriptionState { this.needsPartitionAssignment = false; } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index f73eedb..af9993c 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -182,6 +182,21 @@ public class Utils { } /** + * Get the minimum of some long values. + * @param first Used to ensure at least one value + * @param rest The rest of longs to compare + * @return The minimum of all passed argument. + */ + public static long min(long first, long ... rest) { + long min = first; + for (int i = 0; i < rest.length; i++) { + if (rest[i] < min) + min = rest[i]; + } + return min; + } + + /** * Get the length for UTF8-encoding a string without encoding it first * * @param s The string to calculate the length for http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index 677edd3..26b6b40 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -25,7 +25,7 @@ import org.junit.Test; public class MockConsumerTest { - private MockConsumer<String, String> consumer = new MockConsumer<String, String>(); + private MockConsumer<String, String> consumer = new MockConsumer<String, String>(OffsetResetStrategy.EARLIEST); @Test public void testSimpleMock() { http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java index 1454ab7..613b192 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java @@ -17,10 +17,11 @@ package org.apache.kafka.clients.consumer.internals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -49,24 +50,20 @@ public class CoordinatorTest { private String topicName = "test"; private String groupId = "test-group"; private TopicPartition tp = new TopicPartition(topicName, 0); - private long retryBackoffMs = 0L; private int sessionTimeoutMs = 10; private String rebalanceStrategy = "not-matter"; private MockTime time = new MockTime(); private MockClient client = new MockClient(time); - private Metadata metadata = new Metadata(0, Long.MAX_VALUE); private Cluster cluster = TestUtils.singletonCluster(topicName, 1); private Node node = cluster.nodes().get(0); - private SubscriptionState subscriptions = new SubscriptionState(); + private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); private Metrics metrics = new Metrics(time); private Map<String, String> metricTags = new LinkedHashMap<String, String>(); private Coordinator coordinator = new Coordinator(client, groupId, - retryBackoffMs, sessionTimeoutMs, rebalanceStrategy, - metadata, subscriptions, metrics, "consumer" + groupId, @@ -75,13 +72,14 @@ public class CoordinatorTest { @Before public void setup() { - metadata.update(cluster, time.milliseconds()); client.setNode(node); } @Test public void testNormalHeartbeat() { client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.discoverConsumerCoordinator(); + client.poll(0, time.milliseconds()); // normal heartbeat time.sleep(sessionTimeoutMs); @@ -94,6 +92,8 @@ public class CoordinatorTest { @Test public void testCoordinatorNotAvailable() { client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.discoverConsumerCoordinator(); + client.poll(0, time.milliseconds()); // consumer_coordinator_not_available will mark coordinator as unknown time.sleep(sessionTimeoutMs); @@ -108,6 +108,8 @@ public class CoordinatorTest { @Test public void testNotCoordinator() { client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.discoverConsumerCoordinator(); + client.poll(0, time.milliseconds()); // not_coordinator will mark coordinator as unknown time.sleep(sessionTimeoutMs); @@ -122,6 +124,8 @@ public class CoordinatorTest { @Test public void testIllegalGeneration() { client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.discoverConsumerCoordinator(); + client.poll(0, time.milliseconds()); // illegal_generation will cause re-partition subscriptions.subscribe(topicName); @@ -139,6 +143,8 @@ public class CoordinatorTest { @Test public void testCoordinatorDisconnect() { client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.discoverConsumerCoordinator(); + client.poll(0, time.milliseconds()); // coordinator disconnect will mark coordinator as unknown time.sleep(sessionTimeoutMs); @@ -152,39 +158,67 @@ public class CoordinatorTest { @Test public void testNormalJoinGroup() { + subscriptions.subscribe(topicName); + subscriptions.needReassignment(); + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.discoverConsumerCoordinator(); + client.poll(0, time.milliseconds()); // normal join group client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code())); - assertEquals(Collections.singletonList(tp), - coordinator.assignPartitions(Collections.singletonList(topicName), time.milliseconds())); - assertEquals(0, client.inFlightRequestCount()); + coordinator.assignPartitions(time.milliseconds()); + client.poll(0, time.milliseconds()); + + assertFalse(subscriptions.partitionAssignmentNeeded()); + assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions()); } @Test public void testReJoinGroup() { + subscriptions.subscribe(topicName); + subscriptions.needReassignment(); + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.discoverConsumerCoordinator(); + client.poll(0, time.milliseconds()); + assertTrue(subscriptions.partitionAssignmentNeeded()); // diconnected from original coordinator will cause re-discover and join again client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()), true); + coordinator.assignPartitions(time.milliseconds()); + client.poll(0, time.milliseconds()); + assertTrue(subscriptions.partitionAssignmentNeeded()); + + // rediscover the coordinator client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.discoverConsumerCoordinator(); + client.poll(0, time.milliseconds()); + + // try assigning partitions again client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code())); - assertEquals(Collections.singletonList(tp), - coordinator.assignPartitions(Collections.singletonList(topicName), time.milliseconds())); - assertEquals(0, client.inFlightRequestCount()); + coordinator.assignPartitions(time.milliseconds()); + client.poll(0, time.milliseconds()); + assertFalse(subscriptions.partitionAssignmentNeeded()); + assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions()); } @Test public void testCommitOffsetNormal() { client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.discoverConsumerCoordinator(); + client.poll(0, time.milliseconds()); - // sync commit + // With success flag client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); - coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds()); + RequestFuture<Void> result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds()); + assertEquals(1, client.poll(0, time.milliseconds()).size()); + assertTrue(result.isDone()); + assertTrue(result.succeeded()); - // async commit - coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds()); + // Without success flag + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds()); client.respond(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); assertEquals(1, client.poll(0, time.milliseconds()).size()); } @@ -192,34 +226,55 @@ public class CoordinatorTest { @Test public void testCommitOffsetError() { client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.discoverConsumerCoordinator(); + client.poll(0, time.milliseconds()); // async commit with coordinator not available client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()))); - coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds()); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds()); assertEquals(1, client.poll(0, time.milliseconds()).size()); assertTrue(coordinator.coordinatorUnknown()); // resume client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.discoverConsumerCoordinator(); + client.poll(0, time.milliseconds()); // async commit with not coordinator client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code()))); - coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds()); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds()); assertEquals(1, client.poll(0, time.milliseconds()).size()); assertTrue(coordinator.coordinatorUnknown()); // resume client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.discoverConsumerCoordinator(); + client.poll(0, time.milliseconds()); // sync commit with not_coordinator client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code()))); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); - coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds()); + RequestFuture<Void> result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds()); + assertEquals(1, client.poll(0, time.milliseconds()).size()); + assertTrue(result.isDone()); + assertEquals(RequestFuture.RetryAction.FIND_COORDINATOR, result.retryAction()); // sync commit with coordinator disconnected client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true); - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); - coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds()); + result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds()); + + assertEquals(0, client.poll(0, time.milliseconds()).size()); + assertTrue(result.isDone()); + assertEquals(RequestFuture.RetryAction.FIND_COORDINATOR, result.retryAction()); + + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.discoverConsumerCoordinator(); + client.poll(0, time.milliseconds()); + + result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), time.milliseconds()); + assertEquals(1, client.poll(0, time.milliseconds()).size()); + assertTrue(result.isDone()); + assertTrue(result.succeeded()); } @@ -227,33 +282,70 @@ public class CoordinatorTest { public void testFetchOffset() { client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.discoverConsumerCoordinator(); + client.poll(0, time.milliseconds()); // normal fetch client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L)); - assertEquals(100L, (long) coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).get(tp)); + RequestFuture<Map<TopicPartition, Long>> result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()); + client.poll(0, time.milliseconds()); + assertTrue(result.isDone()); + assertEquals(100L, (long) result.value().get(tp)); // fetch with loading in progress client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L)); client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L)); - assertEquals(100L, (long) coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).get(tp)); + + result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()); + client.poll(0, time.milliseconds()); + assertTrue(result.isDone()); + assertTrue(result.failed()); + assertEquals(RequestFuture.RetryAction.BACKOFF, result.retryAction()); + + result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()); + client.poll(0, time.milliseconds()); + assertTrue(result.isDone()); + assertEquals(100L, (long) result.value().get(tp)); // fetch with not coordinator client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code(), "", 100L)); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L)); - assertEquals(100L, (long) coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).get(tp)); + + result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()); + client.poll(0, time.milliseconds()); + assertTrue(result.isDone()); + assertTrue(result.failed()); + assertEquals(RequestFuture.RetryAction.FIND_COORDINATOR, result.retryAction()); + + coordinator.discoverConsumerCoordinator(); + client.poll(0, time.milliseconds()); + + result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()); + client.poll(0, time.milliseconds()); + assertTrue(result.isDone()); + assertEquals(100L, (long) result.value().get(tp)); // fetch with no fetchable offsets client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L)); - assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size()); + result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()); + client.poll(0, time.milliseconds()); + assertTrue(result.isDone()); + assertTrue(result.value().isEmpty()); // fetch with offset topic unknown client.prepareResponse(offsetFetchResponse(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "", 100L)); - assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size()); + result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()); + client.poll(0, time.milliseconds()); + assertTrue(result.isDone()); + assertTrue(result.value().isEmpty()); // fetch with offset -1 client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L)); - assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size()); + result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()); + client.poll(0, time.milliseconds()); + assertTrue(result.isDone()); + assertTrue(result.value().isEmpty()); } private Struct consumerMetadataResponse(Node node, short error) { http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 4195410..405efdc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -16,11 +16,10 @@ */ package org.apache.kafka.clients.consumer.internals; -import static org.junit.Assert.assertEquals; - import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -30,10 +29,11 @@ import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.requests.FetchResponse; -import org.apache.kafka.common.requests.ListOffsetResponse; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Test; import java.nio.ByteBuffer; import java.util.Collections; @@ -41,37 +41,33 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import org.junit.Before; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class FetcherTest { private String topicName = "test"; private String groupId = "test-group"; private TopicPartition tp = new TopicPartition(topicName, 0); - private long retryBackoffMs = 0L; private int minBytes = 1; private int maxWaitMs = 0; private int fetchSize = 1000; - private String offsetReset = "EARLIEST"; private MockTime time = new MockTime(); private MockClient client = new MockClient(time); private Metadata metadata = new Metadata(0, Long.MAX_VALUE); private Cluster cluster = TestUtils.singletonCluster(topicName, 1); private Node node = cluster.nodes().get(0); - private SubscriptionState subscriptions = new SubscriptionState(); + private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); private Metrics metrics = new Metrics(time); private Map<String, String> metricTags = new LinkedHashMap<String, String>(); private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE); private Fetcher<byte[], byte[]> fetcher = new Fetcher<byte[], byte[]>(client, - retryBackoffMs, minBytes, maxWaitMs, fetchSize, true, // check crc - offsetReset, new ByteArrayDeserializer(), new ByteArrayDeserializer(), metadata, @@ -140,11 +136,11 @@ public class FetcherTest { subscriptions.fetched(tp, 5); fetcher.initFetches(cluster, time.milliseconds()); client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L)); - client.prepareResponse(listOffsetResponse(Collections.singletonList(0L), Errors.NONE.code())); client.poll(0, time.milliseconds()); + assertTrue(subscriptions.isOffsetResetNeeded(tp)); assertEquals(0, fetcher.fetchedRecords().size()); - assertEquals(0L, (long) subscriptions.fetched(tp)); - assertEquals(0L, (long) subscriptions.consumed(tp)); + assertEquals(null, subscriptions.fetched(tp)); + assertEquals(null, subscriptions.consumed(tp)); } @Test @@ -157,11 +153,11 @@ public class FetcherTest { // fetch with out of range fetcher.initFetches(cluster, time.milliseconds()); client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L)); - client.prepareResponse(listOffsetResponse(Collections.singletonList(0L), Errors.NONE.code())); client.poll(0, time.milliseconds()); + assertTrue(subscriptions.isOffsetResetNeeded(tp)); assertEquals(0, fetcher.fetchedRecords().size()); - assertEquals(0L, (long) subscriptions.fetched(tp)); - assertEquals(0L, (long) subscriptions.consumed(tp)); + assertEquals(null, subscriptions.fetched(tp)); + assertEquals(null, subscriptions.consumed(tp)); } private Struct fetchResponse(ByteBuffer buffer, short error, long hw) { @@ -169,9 +165,5 @@ public class FetcherTest { return response.toStruct(); } - private Struct listOffsetResponse(List<Long> offsets, short error) { - ListOffsetResponse response = new ListOffsetResponse(Collections.singletonMap(tp, new ListOffsetResponse.PartitionData(error, offsets))); - return response.toStruct(); - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java index ecc78ce..ee1ede0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.MockTime; import org.junit.Test; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -42,4 +43,12 @@ public class HeartbeatTest { time.sleep(timeout / (2 * Heartbeat.HEARTBEATS_PER_SESSION_INTERVAL)); assertFalse(heartbeat.shouldHeartbeat(time.milliseconds())); } + + @Test + public void testTimeToNextHeartbeat() { + heartbeat.sentHeartbeat(0); + assertEquals(100, heartbeat.timeToNextHeartbeat(0)); + assertEquals(0, heartbeat.timeToNextHeartbeat(100)); + assertEquals(0, heartbeat.timeToNextHeartbeat(200)); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index e000cf8..319751c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -22,12 +22,13 @@ import static java.util.Arrays.asList; import java.util.Collections; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.TopicPartition; import org.junit.Test; public class SubscriptionStateTest { - private final SubscriptionState state = new SubscriptionState(); + private final SubscriptionState state = new SubscriptionState(OffsetResetStrategy.EARLIEST); private final TopicPartition tp0 = new TopicPartition("test", 0); private final TopicPartition tp1 = new TopicPartition("test", 1); @@ -43,7 +44,21 @@ public class SubscriptionStateTest { assertTrue(state.assignedPartitions().isEmpty()); assertAllPositions(tp0, null); } - + + @Test + public void partitionReset() { + state.subscribe(tp0); + state.seek(tp0, 5); + assertEquals(5L, (long) state.fetched(tp0)); + assertEquals(5L, (long) state.consumed(tp0)); + state.needOffsetReset(tp0); + assertTrue(state.isOffsetResetNeeded()); + assertTrue(state.isOffsetResetNeeded(tp0)); + assertEquals(null, state.fetched(tp0)); + assertEquals(null, state.consumed(tp0)); + } + + @Test public void topicSubscription() { state.subscribe("test"); assertEquals(1, state.subscribedTopics().size()); http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java index 2ebe3c2..e7951d8 100755 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -100,4 +100,12 @@ public class UtilsTest { buffer = ByteBuffer.wrap(myvar).asReadOnlyBuffer(); this.subTest(buffer); } + + @Test + public void testMin() { + assertEquals(1, Utils.min(1)); + assertEquals(1, Utils.min(1, 2, 3)); + assertEquals(1, Utils.min(2, 1, 3)); + assertEquals(1, Utils.min(2, 3, 1)); + } } \ No newline at end of file