Repository: kafka Updated Branches: refs/heads/trunk ff5fc9dd1 -> a47bfbcae
http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index f823b09..80c65df 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; @@ -27,6 +28,8 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; +import org.apache.kafka.connect.runtime.MockConnectMetrics.MockMetricsReporter; import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.Plugins; @@ -43,7 +46,6 @@ import org.apache.kafka.connect.util.MockTime; import org.apache.kafka.connect.util.ThreadedTest; import org.easymock.Capture; import org.easymock.EasyMock; -import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -76,7 +78,6 @@ public class WorkerTest extends ThreadedTest { private WorkerConfig config; private Worker worker; - private ConnectMetrics metrics; @Mock private Plugins plugins; @@ -103,17 +104,12 @@ public class WorkerTest extends ThreadedTest { workerProps.put("internal.key.converter.schemas.enable", "false"); workerProps.put("internal.value.converter.schemas.enable", "false"); workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets"); + workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); config = new StandaloneConfig(workerProps); - metrics = new MockConnectMetrics(); PowerMock.mockStatic(Plugins.class); } - @After - public void tearDown() { - if (metrics != null) metrics.stop(); - } - @Test public void testStartAndStopConnector() throws Exception { expectConverters(); @@ -134,6 +130,8 @@ public class WorkerTest extends ThreadedTest { props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName()); + EasyMock.expect(connector.version()).andReturn("1.0"); + EasyMock.expect(plugins.compareAndSwapLoaders(connector)) .andReturn(delegatingLoader) .times(2); @@ -171,10 +169,15 @@ public class WorkerTest extends ThreadedTest { } catch (ConnectException e) { // expected } + assertStatistics(worker, 1, 0); + assertStartupStatistics(worker, 1, 0, 0, 0); worker.stopConnector(CONNECTOR_ID); + assertStatistics(worker, 0, 0); + assertStartupStatistics(worker, 1, 0, 0, 0); assertEquals(Collections.emptySet(), worker.connectorNames()); // Nothing should be left, so this should effectively be a nop worker.stop(); + assertStatistics(worker, 0, 0); PowerMock.verifyAll(); } @@ -210,11 +213,17 @@ public class WorkerTest extends ThreadedTest { worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker.start(); + assertStatistics(worker, 0, 0); assertFalse(worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED)); + assertStartupStatistics(worker, 1, 1, 0, 0); assertEquals(Collections.emptySet(), worker.connectorNames()); + assertStatistics(worker, 0, 0); + assertStartupStatistics(worker, 1, 1, 0, 0); assertFalse(worker.stopConnector(CONNECTOR_ID)); + assertStatistics(worker, 0, 0); + assertStartupStatistics(worker, 1, 1, 0, 0); PowerMock.verifyAll(); } @@ -238,6 +247,7 @@ public class WorkerTest extends ThreadedTest { props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTestConnector"); + EasyMock.expect(connector.version()).andReturn("1.0"); EasyMock.expect(plugins.compareAndSwapLoaders(connector)) .andReturn(delegatingLoader) .times(2); @@ -267,14 +277,21 @@ public class WorkerTest extends ThreadedTest { worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker.start(); + assertStatistics(worker, 0, 0); assertEquals(Collections.emptySet(), worker.connectorNames()); worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED); assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames()); + assertStatistics(worker, 1, 0); + assertStartupStatistics(worker, 1, 0, 0, 0); worker.stopConnector(CONNECTOR_ID); + assertStatistics(worker, 0, 0); + assertStartupStatistics(worker, 1, 0, 0, 0); assertEquals(Collections.emptySet(), worker.connectorNames()); // Nothing should be left, so this should effectively be a nop worker.stop(); + assertStatistics(worker, 0, 0); + assertStartupStatistics(worker, 1, 0, 0, 0); PowerMock.verifyAll(); } @@ -298,6 +315,7 @@ public class WorkerTest extends ThreadedTest { props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTest"); + EasyMock.expect(connector.version()).andReturn("1.0"); EasyMock.expect(plugins.compareAndSwapLoaders(connector)) .andReturn(delegatingLoader) .times(2); @@ -327,14 +345,18 @@ public class WorkerTest extends ThreadedTest { worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker.start(); + assertStatistics(worker, 0, 0); assertEquals(Collections.emptySet(), worker.connectorNames()); worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED); assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames()); + assertStatistics(worker, 1, 0); worker.stopConnector(CONNECTOR_ID); + assertStatistics(worker, 0, 0); assertEquals(Collections.emptySet(), worker.connectorNames()); // Nothing should be left, so this should effectively be a nop worker.stop(); + assertStatistics(worker, 0, 0); PowerMock.verifyAll(); } @@ -374,6 +396,7 @@ public class WorkerTest extends ThreadedTest { props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName()); + EasyMock.expect(connector.version()).andReturn("1.0"); EasyMock.expect(plugins.compareAndSwapLoaders(connector)) .andReturn(delegatingLoader) .times(3); @@ -409,8 +432,10 @@ public class WorkerTest extends ThreadedTest { worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker.start(); + assertStatistics(worker, 0, 0); assertEquals(Collections.emptySet(), worker.connectorNames()); worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED); + assertStatistics(worker, 1, 0); assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames()); try { worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED); @@ -426,10 +451,15 @@ public class WorkerTest extends ThreadedTest { assertEquals(2, taskConfigs.size()); assertEquals(expectedTaskProps, taskConfigs.get(0)); assertEquals(expectedTaskProps, taskConfigs.get(1)); + assertStatistics(worker, 1, 0); + assertStartupStatistics(worker, 1, 0, 0, 0); worker.stopConnector(CONNECTOR_ID); + assertStatistics(worker, 0, 0); + assertStartupStatistics(worker, 1, 0, 0, 0); assertEquals(Collections.emptySet(), worker.connectorNames()); // Nothing should be left, so this should effectively be a nop worker.stop(); + assertStatistics(worker, 0, 0); PowerMock.verifyAll(); } @@ -507,13 +537,21 @@ public class WorkerTest extends ThreadedTest { worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker.start(); + assertStatistics(worker, 0, 0); + assertStartupStatistics(worker, 0, 0, 0, 0); assertEquals(Collections.emptySet(), worker.taskIds()); worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED); + assertStatistics(worker, 0, 1); + assertStartupStatistics(worker, 0, 0, 1, 0); assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds()); worker.stopAndAwaitTask(TASK_ID); + assertStatistics(worker, 0, 0); + assertStartupStatistics(worker, 0, 0, 1, 0); assertEquals(Collections.emptySet(), worker.taskIds()); // Nothing should be left, so this should effectively be a nop worker.stop(); + assertStatistics(worker, 0, 0); + assertStartupStatistics(worker, 0, 0, 1, 0); PowerMock.verifyAll(); } @@ -550,9 +588,14 @@ public class WorkerTest extends ThreadedTest { worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker.start(); + assertStatistics(worker, 0, 0); + assertStartupStatistics(worker, 0, 0, 0, 0); assertFalse(worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED)); + assertStartupStatistics(worker, 0, 0, 1, 1); + assertStatistics(worker, 0, 0); + assertStartupStatistics(worker, 0, 0, 1, 1); assertEquals(Collections.emptySet(), worker.taskIds()); PowerMock.verifyAll(); @@ -632,8 +675,11 @@ public class WorkerTest extends ThreadedTest { worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker.start(); + assertStatistics(worker, 0, 0); worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED); + assertStatistics(worker, 0, 1); worker.stop(); + assertStatistics(worker, 0, 0); PowerMock.verifyAll(); } @@ -712,6 +758,7 @@ public class WorkerTest extends ThreadedTest { worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker.start(); + assertStatistics(worker, 0, 0); assertEquals(Collections.emptySet(), worker.taskIds()); Map<String, String> connProps = anyConnectorConfigMap(); connProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName()); @@ -719,11 +766,14 @@ public class WorkerTest extends ThreadedTest { connProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName()); connProps.put("value.converter.extra.config", "bar"); worker.startTask(TASK_ID, connProps, origProps, taskStatusListener, TargetState.STARTED); + assertStatistics(worker, 0, 1); assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds()); worker.stopAndAwaitTask(TASK_ID); + assertStatistics(worker, 0, 0); assertEquals(Collections.emptySet(), worker.taskIds()); // Nothing should be left, so this should effectively be a nop worker.stop(); + assertStatistics(worker, 0, 0); // Validate extra configs got passed through to overridden converters assertEquals("foo", keyConverter.getValue().configs.get("extra.config")); @@ -732,6 +782,41 @@ public class WorkerTest extends ThreadedTest { PowerMock.verifyAll(); } + private void assertStatistics(Worker worker, int connectors, int tasks) { + MetricGroup workerMetrics = worker.workerMetricsGroup().metricGroup(); + assertEquals(connectors, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "connector-count"), 0.0001d); + assertEquals(tasks, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "task-count"), 0.0001d); + assertEquals(tasks, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "task-count"), 0.0001d); + } + + private void assertStartupStatistics(Worker worker, int connectorStartupAttempts, int connectorStartupFailures, int taskStartupAttempts, int taskStartupFailures) { + double connectStartupSuccesses = connectorStartupAttempts - connectorStartupFailures; + double taskStartupSuccesses = taskStartupAttempts - taskStartupFailures; + double connectStartupSuccessPct = 0.0d; + double connectStartupFailurePct = 0.0d; + double taskStartupSuccessPct = 0.0d; + double taskStartupFailurePct = 0.0d; + if (connectorStartupAttempts != 0) { + connectStartupSuccessPct = connectStartupSuccesses / connectorStartupAttempts; + connectStartupFailurePct = (double) connectorStartupFailures / connectorStartupAttempts; + } + if (taskStartupAttempts != 0) { + taskStartupSuccessPct = taskStartupSuccesses / taskStartupAttempts; + taskStartupFailurePct = (double) taskStartupFailures / taskStartupAttempts; + } + MetricGroup workerMetrics = worker.workerMetricsGroup().metricGroup(); + assertEquals(connectorStartupAttempts, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "connector-startup-attempts-total"), 0.0001d); + assertEquals(connectStartupSuccesses, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "connector-startup-success-total"), 0.0001d); + assertEquals(connectorStartupFailures, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "connector-startup-failure-total"), 0.0001d); + assertEquals(connectStartupSuccessPct, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "connector-startup-success-percentage"), 0.0001d); + assertEquals(connectStartupFailurePct, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "connector-startup-failure-percentage"), 0.0001d); + assertEquals(taskStartupAttempts, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "task-startup-attempts-total"), 0.0001d); + assertEquals(taskStartupSuccesses, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "task-startup-success-total"), 0.0001d); + assertEquals(taskStartupFailures, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "task-startup-failure-total"), 0.0001d); + assertEquals(taskStartupSuccessPct, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "task-startup-success-percentage"), 0.0001d); + assertEquals(taskStartupFailurePct, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "task-startup-failure-percentage"), 0.0001d); + } + private void expectStartStorage() { offsetBackingStore.configure(EasyMock.anyObject(WorkerConfig.class)); EasyMock.expectLastCall(); http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 40d0e2b..7483261 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -25,13 +25,16 @@ import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.errors.AlreadyExistsException; import org.apache.kafka.connect.errors.NotFoundException; +import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.MockConnectMetrics; import org.apache.kafka.connect.runtime.SinkConnectorConfig; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.distributed.DistributedHerder.HerderMetrics; import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.Plugins; @@ -50,6 +53,7 @@ import org.apache.kafka.connect.util.FutureCallback; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.IAnswer; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -151,6 +155,7 @@ public class DistributedHerderTest { @Mock private WorkerGroupMember member; private MockTime time; private DistributedHerder herder; + private MockConnectMetrics metrics; @Mock private Worker worker; @Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback; @Mock @@ -165,12 +170,13 @@ public class DistributedHerderTest { @Before public void setUp() throws Exception { + time = new MockTime(); + metrics = new MockConnectMetrics(time); worker = PowerMock.createMock(Worker.class); EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.FALSE); - time = new MockTime(); herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus"}, - new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore, configBackingStore, member, MEMBER_URL, time); + new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time); configUpdateListener = herder.new ConfigUpdateListener(); rebalanceListener = herder.new RebalanceListener(); @@ -182,6 +188,11 @@ public class DistributedHerderTest { PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes(); } + @After + public void tearDown() { + if (metrics != null) metrics.stop(); + } + @Test public void testJoinAssignment() throws Exception { // Join group and get assignment @@ -204,6 +215,8 @@ public class DistributedHerderTest { PowerMock.replayAll(); herder.tick(); + time.sleep(1000L); + assertStatistics(3, 1, 100, 1000L); PowerMock.verifyAll(); } @@ -241,9 +254,17 @@ public class DistributedHerderTest { PowerMock.replayAll(); + time.sleep(1000L); + assertStatistics(0, 0, 0, Double.POSITIVE_INFINITY); herder.tick(); + + time.sleep(2000L); + assertStatistics(3, 1, 100, 2000); herder.tick(); + time.sleep(3000L); + assertStatistics(3, 2, 100, 3000); + PowerMock.verifyAll(); } @@ -282,7 +303,12 @@ public class DistributedHerderTest { PowerMock.replayAll(); herder.tick(); + time.sleep(1000L); + assertStatistics(3, 1, 100, 1000L); + herder.tick(); + time.sleep(2000L); + assertStatistics(3, 2, 100, 2000L); PowerMock.verifyAll(); } @@ -345,6 +371,9 @@ public class DistributedHerderTest { herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, putConnectorCallback); herder.tick(); + time.sleep(1000L); + assertStatistics(3, 1, 100, 1000L); + PowerMock.verifyAll(); } @@ -390,6 +419,9 @@ public class DistributedHerderTest { assertTrue(error.hasCaptured()); assertTrue(error.getValue() instanceof BadRequestException); + time.sleep(1000L); + assertStatistics(3, 1, 100, 1000L); + PowerMock.verifyAll(); } @@ -435,6 +467,9 @@ public class DistributedHerderTest { assertTrue(error.hasCaptured()); assertTrue(error.getValue() instanceof BadRequestException); + time.sleep(1000L); + assertStatistics(3, 1, 100, 1000L); + PowerMock.verifyAll(); } @@ -478,6 +513,9 @@ public class DistributedHerderTest { assertTrue(error.hasCaptured()); assertTrue(error.getValue() instanceof BadRequestException); + time.sleep(1000L); + assertStatistics(3, 1, 100, 1000L); + PowerMock.verifyAll(); } @@ -503,6 +541,9 @@ public class DistributedHerderTest { herder.putConnectorConfig(CONN1, CONN1_CONFIG, false, putConnectorCallback); herder.tick(); + time.sleep(1000L); + assertStatistics(3, 1, 100, 1000L); + PowerMock.verifyAll(); } @@ -535,6 +576,9 @@ public class DistributedHerderTest { herder.deleteConnectorConfig(CONN1, putConnectorCallback); herder.tick(); + time.sleep(1000L); + assertStatistics(3, 1, 100, 1000L); + PowerMock.verifyAll(); } @@ -672,10 +716,16 @@ public class DistributedHerderTest { PowerMock.replayAll(); herder.tick(); + time.sleep(1000L); + assertStatistics(3, 1, 100, 1000L); + FutureCallback<Void> callback = new FutureCallback<>(); herder.restartConnector(CONN1, callback); herder.tick(); + time.sleep(2000L); + assertStatistics(3, 1, 100, 3000L); + try { callback.get(1000L, TimeUnit.MILLISECONDS); fail("Expected NotLeaderException to be raised"); @@ -1202,7 +1252,13 @@ public class DistributedHerderTest { PowerMock.replayAll(); herder.tick(); + time.sleep(1000L); + assertStatistics("leaderUrl", true, 3, 0, Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY); + herder.tick(); + time.sleep(2000L); + assertStatistics("leaderUrl", false, 3, 1, 100, 2000L); + PowerMock.verifyAll(); } @@ -1365,7 +1421,8 @@ public class DistributedHerderTest { rebalanceListener.onRevoked("leader", revokedConnectors, revokedTasks); ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment( error, "leader", "leaderUrl", offset, assignedConnectors, assignedTasks); - rebalanceListener.onAssigned(assignment, 0); + rebalanceListener.onAssigned(assignment, 3); + time.sleep(100L); return null; } }); @@ -1397,6 +1454,35 @@ public class DistributedHerderTest { EasyMock.expect(configBackingStore.snapshot()).andReturn(readToEndSnapshot); } + private void assertStatistics(int expectedEpoch, int completedRebalances, double rebalanceTime, double millisSinceLastRebalance) { + String expectedLeader = completedRebalances <= 0 ? null : "leaderUrl"; + assertStatistics(expectedLeader, false, expectedEpoch, completedRebalances, rebalanceTime, millisSinceLastRebalance); + } + + private void assertStatistics(String expectedLeader, boolean isRebalancing, int expectedEpoch, int completedRebalances, double rebalanceTime, double millisSinceLastRebalance) { + HerderMetrics herderMetrics = herder.herderMetrics(); + MetricGroup group = herderMetrics.metricGroup(); + double epoch = MockConnectMetrics.currentMetricValueAsDouble(metrics, group, "epoch"); + String leader = MockConnectMetrics.currentMetricValueAsString(metrics, group, "leader-name"); + double rebalanceCompletedTotal = MockConnectMetrics.currentMetricValueAsDouble(metrics, group, "completed-rebalances-total"); + double rebalancing = MockConnectMetrics.currentMetricValueAsDouble(metrics, group, "rebalancing"); + double rebalanceTimeMax = MockConnectMetrics.currentMetricValueAsDouble(metrics, group, "rebalance-max-time-ms"); + double rebalanceTimeAvg = MockConnectMetrics.currentMetricValueAsDouble(metrics, group, "rebalance-avg-time-ms"); + double rebalanceTimeSinceLast = MockConnectMetrics.currentMetricValueAsDouble(metrics, group, "time-since-last-rebalance-ms"); + + assertEquals(expectedEpoch, epoch, 0.0001d); + assertEquals(expectedLeader, leader); + assertEquals(completedRebalances, rebalanceCompletedTotal, 0.0001d); + assertEquals(isRebalancing ? 1.0d : 0.0d, rebalancing, 0.0001d); + assertEquals(millisSinceLastRebalance, rebalanceTimeSinceLast, 0.0001d); + if (rebalanceTime <= 0L) { + assertEquals(Double.NEGATIVE_INFINITY, rebalanceTimeMax, 0.0001d); + assertEquals(0.0d, rebalanceTimeAvg, 0.0001d); + } else { + assertEquals(rebalanceTime, rebalanceTimeMax, 0.0001d); + assertEquals(rebalanceTime, rebalanceTimeAvg, 0.0001d); + } + } // We need to use a real class here due to some issue with mocking java.lang.Class private abstract class BogusSourceConnector extends SourceConnector {
