Repository: samza Updated Branches: refs/heads/master 7fc370407 -> f4ba9cdc1
Moving store test to TestContainerStorageManager from TestSamzaContainer There was a test in TestSamzaContainer that needs to be moved to TestContainerStorageManager because the restore logic is moved there. Minor change in TestSamzaContainer and ContainerStorageManager Author: Ray Matharu <[email protected]> Reviewers: Jagadish <[email protected]> Closes #838 from rmatharu/storeTest-fix Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f4ba9cdc Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f4ba9cdc Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f4ba9cdc Branch: refs/heads/master Commit: f4ba9cdc18651b62f833229e00e2fdd07692dfc5 Parents: 7fc3704 Author: Ray Matharu <[email protected]> Authored: Mon Dec 3 18:08:36 2018 -0800 Committer: Jagadish <[email protected]> Committed: Mon Dec 3 18:08:36 2018 -0800 ---------------------------------------------------------------------- .../apache/samza/storage/StorageRecovery.java | 23 +++++++++--------- .../apache/samza/container/SamzaContainer.scala | 4 ++-- .../samza/storage/ContainerStorageManager.java | 25 ++++++++++---------- .../samza/container/TestSamzaContainer.scala | 20 ++++------------ .../processor/StreamProcessorTestUtils.scala | 3 ++- .../storage/TestContainerStorageManager.java | 19 ++++++++++++--- .../samza/storage/TestTaskStorageManager.scala | 3 +++ 7 files changed, 53 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java index be074ee..64ae310 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java @@ -21,7 +21,6 @@ package org.apache.samza.storage; import java.io.File; import java.time.Duration; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -32,6 +31,7 @@ import org.apache.samza.config.Config; import org.apache.samza.config.JavaStorageConfig; import org.apache.samza.config.JavaSystemConfig; import org.apache.samza.config.StorageConfig; +import org.apache.samza.container.TaskName; import org.apache.samza.context.ContainerContext; import org.apache.samza.context.ContainerContextImpl; import org.apache.samza.context.JobContextImpl; @@ -72,10 +72,11 @@ public class StorageRecovery extends CommandLine { private HashMap<String, SystemStream> changeLogSystemStreams = new HashMap<>(); private HashMap<String, StorageEngineFactory<?, ?>> storageEngineFactories = new HashMap<>(); private Map<String, ContainerModel> containers = new HashMap<>(); - private List<TaskStorageManager> taskStorageManagers = new ArrayList<>(); + private ContainerStorageManager containerStorageManager; private Logger log = LoggerFactory.getLogger(StorageRecovery.class); private SystemAdmins systemAdmins = null; + /** * Construct the StorageRecovery * @@ -100,7 +101,7 @@ public class StorageRecovery extends CommandLine { getContainerModels(); getChangeLogSystemStreamsAndStorageFactories(); getChangeLogMaxPartitionNumber(); - getTaskStorageManagers(); + getContainerStorageManager(); } /** @@ -112,11 +113,8 @@ public class StorageRecovery extends CommandLine { log.info("start recovering..."); systemAdmins.start(); - for (TaskStorageManager taskStorageManager : taskStorageManagers) { - taskStorageManager.init(); - taskStorageManager.stopStores(); - log.debug("restored " + taskStorageManager.toString()); - } + this.containerStorageManager.start(); + this.containerStorageManager.shutdown(); systemAdmins.stop(); log.info("successfully recovered in " + storeBaseDir.toString()); @@ -201,8 +199,10 @@ public class StorageRecovery extends CommandLine { * List<TaskStorageManager> */ @SuppressWarnings({ "unchecked", "rawtypes" }) - private void getTaskStorageManagers() { + private void getContainerStorageManager() { Clock clock = SystemClock.instance(); + Map<TaskName, TaskStorageManager> taskStorageManagers = new HashMap<>(); + HashMap<String, SystemConsumer> storeConsumers = getStoreConsumers(); StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, clock); // don't worry about prefetching for this; looks like the tool doesn't flush to offset files anyways SSPMetadataCache sspMetadataCache = @@ -213,7 +213,6 @@ public class StorageRecovery extends CommandLine { ContainerContext containerContext = new ContainerContextImpl(containerModel, new MetricsRegistryMap()); for (TaskModel taskModel : containerModel.getTasks().values()) { - HashMap<String, SystemConsumer> storeConsumers = getStoreConsumers(); for (Entry<String, StorageEngineFactory<?, ?>> entry : storageEngineFactories.entrySet()) { String storeName = entry.getKey(); @@ -253,8 +252,10 @@ public class StorageRecovery extends CommandLine { new StorageConfig(jobConfig).getChangeLogDeleteRetentionsInMs(), new SystemClock()); - taskStorageManagers.add(taskStorageManager); + taskStorageManagers.put(taskModel.getTaskName(), taskStorageManager); } } + + this.containerStorageManager = new ContainerStorageManager(taskStorageManagers, storeConsumers, null); } } http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index ed50719..94bc138 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -515,7 +515,7 @@ object SamzaContainer extends Logging { info("Created store system consumers: %s" format storeSystemConsumers) - var taskStorageManagers : Map[TaskInstance, TaskStorageManager] = Map() + var taskStorageManagers : Map[TaskName, TaskStorageManager] = Map() // Create taskInstances val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.asScala.map(taskModel => { @@ -679,7 +679,7 @@ object SamzaContainer extends Logging { val taskInstance = createTaskInstance(task) - taskStorageManagers += taskInstance -> storageManager + taskStorageManagers += taskInstance.taskName -> storageManager (taskName, taskInstance) }).toMap http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java index 5fc5573..c39d6e7 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java @@ -28,7 +28,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.samza.SamzaException; import org.apache.samza.container.SamzaContainerMetrics; -import org.apache.samza.container.TaskInstance; +import org.apache.samza.container.TaskName; import org.apache.samza.metrics.Gauge; import org.apache.samza.system.SystemConsumer; import org.slf4j.Logger; @@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory; public class ContainerStorageManager { private static final Logger LOG = LoggerFactory.getLogger(ContainerStorageManager.class); - private final Map<TaskInstance, TaskStorageManager> taskStorageManagers; + private final Map<TaskName, TaskStorageManager> taskStorageManagers; private final SamzaContainerMetrics samzaContainerMetrics; // Mapping of from storeSystemNames to SystemConsumers @@ -61,7 +61,7 @@ public class ContainerStorageManager { // Naming convention to be used for restore threads private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d"; - public ContainerStorageManager(Map<TaskInstance, TaskStorageManager> taskStorageManagers, + public ContainerStorageManager(Map<TaskName, TaskStorageManager> taskStorageManagers, Map<String, SystemConsumer> systemConsumers, SamzaContainerMetrics samzaContainerMetrics) { this.taskStorageManagers = taskStorageManagers; this.systemConsumers = systemConsumers; @@ -129,30 +129,31 @@ public class ContainerStorageManager { */ private class TaskRestoreCallable implements Callable<Void> { - private TaskInstance taskInstance; + private TaskName taskName; private TaskStorageManager taskStorageManager; private SamzaContainerMetrics samzaContainerMetrics; - public TaskRestoreCallable(SamzaContainerMetrics samzaContainerMetrics, TaskInstance taskInstance, + public TaskRestoreCallable(SamzaContainerMetrics samzaContainerMetrics, TaskName taskName, TaskStorageManager taskStorageManager) { this.samzaContainerMetrics = samzaContainerMetrics; - this.taskInstance = taskInstance; + this.taskName = taskName; this.taskStorageManager = taskStorageManager; } @Override public Void call() { long startTime = System.currentTimeMillis(); - LOG.info("Starting stores in task instance {}", this.taskInstance.taskName().getTaskName()); + LOG.info("Starting stores in task instance {}", this.taskName.getTaskName()); taskStorageManager.restoreStores(); long timeToRestore = System.currentTimeMillis() - startTime; - Gauge taskGauge = this.samzaContainerMetrics.taskStoreRestorationMetrics() - .getOrDefault(this.taskInstance.taskName().getTaskName(), null); - if (taskGauge != null) { - taskGauge.set(timeToRestore); - } + if (this.samzaContainerMetrics != null) { + Gauge taskGauge = this.samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(this.taskName, null); + if (taskGauge != null) { + taskGauge.set(timeToRestore); + } + } return null; } } http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index eca4673..760e358 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -28,14 +28,13 @@ import org.apache.samza.coordinator.JobModelManager import org.apache.samza.coordinator.server.{HttpServer, JobServlet} import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel} import org.apache.samza.metrics.{Gauge, Timer} +import org.apache.samza.storage.{ContainerStorageManager, TaskStorageManager} import org.apache.samza.system._ import org.apache.samza.{Partition, SamzaContainerStatus} import org.junit.Assert._ import org.junit.{Before, Test} import org.mockito.Matchers.{any, notNull} import org.mockito.Mockito._ -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} import org.scalatest.junit.AssertionsForJUnit import org.scalatest.mockito.MockitoSugar @@ -67,6 +66,9 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { @Mock private var samzaContainerListener: SamzaContainerListener = null + @Mock + private var containerStorageManager: ContainerStorageManager = null + private var samzaContainer: SamzaContainer = null @Before @@ -153,18 +155,6 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { } @Test - def testStartStoresIncrementsCounter() { - when(this.taskInstance.taskName).thenReturn(TASK_NAME) - val restoreGauge = mock[Gauge[Long]] - when(this.metrics.taskStoreRestorationMetrics).thenReturn(Map(TASK_NAME -> restoreGauge)) - - this.samzaContainer.startStores - val restoreGaugeValueCaptor = ArgumentCaptor.forClass(classOf[Long]) - verify(restoreGauge).set(restoreGaugeValueCaptor.capture()) - assertTrue(restoreGaugeValueCaptor.getValue >= 1) - } - - @Test def testApplicationContainerContext() { val orderVerifier = inOrder(this.applicationContainerContext, this.runLoop) this.samzaContainer.run @@ -278,7 +268,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { this.producerMultiplexer, metrics, containerContext = this.containerContext, - applicationContainerContextOption = applicationContainerContext, containerStorageManager = null) + applicationContainerContextOption = applicationContainerContext, containerStorageManager = containerStorageManager) this.samzaContainer.setContainerListener(this.samzaContainerListener) } http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala index d7c71fa..69bae4b 100644 --- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala +++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala @@ -26,6 +26,7 @@ import org.apache.samza.container._ import org.apache.samza.context.{ContainerContext, JobContext} import org.apache.samza.job.model.TaskModel import org.apache.samza.serializers.SerdeManager +import org.apache.samza.storage.ContainerStorageManager import org.apache.samza.system._ import org.apache.samza.system.chooser.RoundRobinChooser import org.apache.samza.task.{StreamTask, TaskInstanceCollector} @@ -68,7 +69,7 @@ object StreamProcessorTestUtils { metrics = new SamzaContainerMetrics, containerContext = containerContext, applicationContainerContextOption = None, - containerStorageManager = null) + containerStorageManager = Mockito.mock(classOf[ContainerStorageManager])) container } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java index dba8678..5e71efc 100644 --- a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java @@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch; import org.apache.samza.container.SamzaContainerMetrics; import org.apache.samza.container.TaskInstance; import org.apache.samza.container.TaskName; +import org.apache.samza.metrics.Gauge; import org.apache.samza.system.SystemConsumer; import org.junit.Assert; import org.junit.Before; @@ -35,7 +36,7 @@ public class TestContainerStorageManager { private ContainerStorageManager containerStorageManager; private Map<String, SystemConsumer> systemConsumers; - private Map<TaskInstance, TaskStorageManager> taskStorageManagers; + private Map<TaskName, TaskStorageManager> taskStorageManagers; private SamzaContainerMetrics samzaContainerMetrics; private CountDownLatch taskStorageManagersRestoreStoreCount; @@ -45,6 +46,8 @@ public class TestContainerStorageManager { private CountDownLatch systemConsumerStartCount; private CountDownLatch systemConsumerStopCount; + private Map<TaskName, Gauge<Object>> taskRestoreMetricGauges; + /** * Utility method for creating a mocked taskInstance and taskStorageManager and adding it to the map. * @param taskname the desired taskname. @@ -71,11 +74,15 @@ public class TestContainerStorageManager { return null; }).when(mockTaskStorageManager).restoreStores(); - taskStorageManagers.put(mockTaskInstance, mockTaskStorageManager); + taskStorageManagers.put(new TaskName(taskname), mockTaskStorageManager); + + Gauge testGauge = Mockito.mock(Gauge.class); + this.taskRestoreMetricGauges.put(new TaskName(taskname), testGauge); } @Before public void setUp() { + taskRestoreMetricGauges = new HashMap<>(); systemConsumers = new HashMap<>(); taskStorageManagers = new HashMap<>(); @@ -93,6 +100,7 @@ public class TestContainerStorageManager { // mock container metrics samzaContainerMetrics = Mockito.mock(SamzaContainerMetrics.class); + Mockito.when(samzaContainerMetrics.taskStoreRestorationMetrics()).thenReturn(taskRestoreMetricGauges); // mock and setup sysconsumers SystemConsumer mockSystemConsumer = Mockito.mock(SystemConsumer.class); @@ -112,7 +120,7 @@ public class TestContainerStorageManager { } @Test - public void testParallelism() { + public void testParallelismAndMetrics() { this.containerStorageManager.start(); this.containerStorageManager.shutdown(); Assert.assertTrue("init count should be 0", this.taskStorageManagersInitCount.getCount() == 0); @@ -121,5 +129,10 @@ public class TestContainerStorageManager { Assert.assertTrue("systemConsumerStopCount count should be 0", this.systemConsumerStopCount.getCount() == 0); Assert.assertTrue("systemConsumerStartCount count should be 0", this.systemConsumerStartCount.getCount() == 0); + + for (Gauge gauge : taskRestoreMetricGauges.values()) { + Assert.assertTrue("Restoration time gauge value should be invoked atleast once", Mockito.mockingDetails(gauge).getInvocations().size() >= 1); + } } + } http://git-wip-us.apache.org/repos/asf/samza/blob/f4ba9cdc/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala index 0b945cb..ffdceca 100644 --- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala @@ -106,6 +106,9 @@ class TestTaskStorageManager extends MockitoSugar { taskManager.init + // mocking restore (issued by ContainerStorageManager) + mockStorageEngine.restore(mock[util.Iterator[IncomingMessageEnvelope]]) + assertTrue(storeFile.exists()) assertFalse(offsetFile.exists()) verify(mockSystemConsumer).register(ssp, "0")
