Merge branch 'master' into 0.14.0
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1701ea84 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1701ea84 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1701ea84 Branch: refs/heads/master Commit: 1701ea84a2e029b3297687b5fc814998371b1a6f Parents: 79200c7 fb39a51 Author: Xinyu Liu <[email protected]> Authored: Tue Sep 12 11:32:36 2017 -0700 Committer: Xinyu Liu <[email protected]> Committed: Tue Sep 12 11:32:36 2017 -0700 ---------------------------------------------------------------------- KEYS | 57 +++ NOTICE | 1 + README.md | 2 +- bin/check-all.sh | 2 +- build.gradle | 4 +- docs/contribute/tests.md | 2 +- .../versioned/jobs/configuration-table.html | 14 +- .../versioned/hello-samza-high-level-yarn.md | 2 +- .../versioned/hello-samza-high-level-zk.md | 2 +- .../versioned/samza-rest-getting-started.md | 2 +- docs/startup/download/index.md | 17 +- docs/startup/hello-samza/versioned/index.md | 2 +- docs/startup/preview/index.md | 2 +- .../main/java/org/apache/samza/AzureClient.java | 20 +- .../main/java/org/apache/samza/AzureConfig.java | 73 --- .../main/java/org/apache/samza/BlobUtils.java | 280 ---------- .../java/org/apache/samza/JobModelBundle.java | 61 --- .../java/org/apache/samza/LeaseBlobManager.java | 98 ---- .../java/org/apache/samza/ProcessorEntity.java | 58 --- .../main/java/org/apache/samza/TableUtils.java | 198 -------- .../org/apache/samza/config/AzureConfig.java | 68 +++ .../coordinator/AzureCoordinationUtils.java | 58 +++ .../AzureCoordinationUtilsFactory.java | 30 ++ .../samza/coordinator/AzureJobCoordinator.java | 509 +++++++++++++++++++ .../coordinator/AzureJobCoordinatorFactory.java | 29 ++ .../samza/coordinator/AzureLeaderElector.java | 109 ++++ .../org/apache/samza/coordinator/AzureLock.java | 100 ++++ .../samza/coordinator/DistributedLock.java | 39 ++ .../samza/coordinator/data/BarrierState.java | 27 + .../samza/coordinator/data/JobModelBundle.java | 61 +++ .../samza/coordinator/data/ProcessorEntity.java | 62 +++ .../scheduler/HeartbeatScheduler.java | 81 +++ .../scheduler/JMVersionUpgradeScheduler.java | 99 ++++ .../LeaderBarrierCompleteScheduler.java | 118 +++++ .../scheduler/LeaderLivenessCheckScheduler.java | 120 +++++ .../scheduler/LivenessCheckScheduler.java | 108 ++++ .../scheduler/RenewLeaseScheduler.java | 79 +++ .../scheduler/SchedulerStateChangeListener.java | 29 ++ .../coordinator/scheduler/TaskScheduler.java | 35 ++ .../java/org/apache/samza/util/BlobUtils.java | 284 +++++++++++ .../org/apache/samza/util/LeaseBlobManager.java | 99 ++++ .../java/org/apache/samza/util/TableUtils.java | 205 ++++++++ .../samza/config/JobCoordinatorConfig.java | 21 + .../apache/samza/container/LocalityManager.java | 6 +- .../coordinator/CoordinationServiceFactory.java | 36 -- .../samza/coordinator/CoordinationUtils.java | 14 +- .../coordinator/CoordinationUtilsFactory.java | 47 ++ .../coordinator/DistributedLockWithState.java | 42 ++ .../samza/runtime/LocalApplicationRunner.java | 91 ++-- .../org/apache/samza/task/AsyncRunLoop.java | 14 + .../samza/zk/ZkBarrierForVersionUpgrade.java | 2 +- .../org/apache/samza/zk/ZkControllerImpl.java | 19 +- .../samza/zk/ZkCoordinationServiceFactory.java | 89 ---- .../apache/samza/zk/ZkCoordinationUtils.java | 26 +- .../samza/zk/ZkCoordinationUtilsFactory.java | 89 ++++ .../org/apache/samza/zk/ZkDistributedLock.java | 117 +++++ .../samza/zk/ZkJobCoordinatorFactory.java | 23 +- .../org/apache/samza/zk/ZkLeaderElector.java | 2 +- .../org/apache/samza/zk/ZkProcessorLatch.java | 23 +- .../main/java/org/apache/samza/zk/ZkUtils.java | 41 +- .../apache/samza/checkpoint/OffsetManager.scala | 50 +- .../apache/samza/container/TaskInstance.scala | 4 +- .../runtime/TestApplicationRunnerMain.java | 2 +- .../runtime/TestLocalApplicationRunner.java | 185 ++++--- .../org/apache/samza/task/TestAsyncRunLoop.java | 24 +- .../apache/samza/zk/TestZkLeaderElector.java | 7 +- .../org/apache/samza/zk/TestZkNamespace.java | 8 +- .../apache/samza/zk/TestZkProcessorLatch.java | 2 +- .../java/org/apache/samza/zk/TestZkUtils.java | 57 ++- .../samza/checkpoint/TestOffsetManager.scala | 58 ++- .../samza/config/TestJobCoordinatorConfig.java | 58 +++ .../samza/container/TestTaskInstance.scala | 62 ++- .../org/apache/samza/rest/SamzaRestService.java | 14 +- .../processor/TestZkStreamProcessorSession.java | 3 +- .../processor/TestZkLocalApplicationRunner.java | 60 ++- .../test/integration/TestStatefulTask.scala | 79 ++- settings.gradle | 17 + 77 files changed, 3444 insertions(+), 1194 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java ---------------------------------------------------------------------- diff --cc samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index 4be4e73,f9c1252..5b2c661 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@@ -19,7 -19,13 +19,14 @@@ package org.apache.samza.runtime; + import com.google.common.collect.ImmutableList; + import java.lang.reflect.Field; + import java.util.Collections; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + import java.util.stream.Collectors; +import java.util.Set; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.JobConfig; @@@ -343,8 -324,73 +325,77 @@@ public class TestLocalApplicationRunne assertEquals(spy.status(app), ApplicationStatus.UnsuccessfulFinish); } + public static Set<StreamProcessor> getProcessors(LocalApplicationRunner runner) { + return runner.getProcessors(); + } + + /** + * A test case to verify if the plan results in different hash if there is change in topological sort order. + * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases. + */ + @Test + public void testPlanIdWithShuffledStreamSpecs() { + List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"), + new StreamSpec("test-stream-2", "stream-2", "testStream"), + new StreamSpec("test-stream-3", "stream-3", "testStream")); + String planIdBeforeShuffle = getExecutionPlanId(streamSpecs); + + List<StreamSpec> shuffledStreamSpecs = ImmutableList.of(new StreamSpec("test-stream-2", "stream-2", "testStream"), + new StreamSpec("test-stream-1", "stream-1", "testStream"), + new StreamSpec("test-stream-3", "stream-3", "testStream")); + + + assertFalse("Expected both of the latch ids to be different", + planIdBeforeShuffle.equals(getExecutionPlanId(shuffledStreamSpecs))); + } + + /** + * A test case to verify if the plan results in same hash in case of same plan. + * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases. + */ + @Test + public void testGeneratePlanIdWithSameStreamSpecs() { + List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"), + new StreamSpec("test-stream-2", "stream-2", "testStream"), + new StreamSpec("test-stream-3", "stream-3", "testStream")); + String planIdForFirstAttempt = getExecutionPlanId(streamSpecs); + String planIdForSecondAttempt = getExecutionPlanId(streamSpecs); + + assertEquals("Expected latch ids to match!", "1447946713", planIdForFirstAttempt); + assertEquals("Expected latch ids to match for the second attempt!", planIdForFirstAttempt, planIdForSecondAttempt); + } + + /** + * A test case to verify plan results in different hash in case of different intermediate stream. + * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases. + */ + @Test + public void testGeneratePlanIdWithDifferentStreamSpecs() { + List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"), + new StreamSpec("test-stream-2", "stream-2", "testStream"), + new StreamSpec("test-stream-3", "stream-3", "testStream")); + String planIdBeforeShuffle = getExecutionPlanId(streamSpecs); + + List<StreamSpec> updatedStreamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"), + new StreamSpec("test-stream-4", "stream-4", "testStream"), + new StreamSpec("test-stream-3", "stream-3", "testStream")); + + + assertFalse("Expected both of the latch ids to be different", + planIdBeforeShuffle.equals(getExecutionPlanId(updatedStreamSpecs))); + } + + private String getExecutionPlanId(List<StreamSpec> updatedStreamSpecs) { + String intermediateStreamJson = + updatedStreamSpecs.stream().map(this::streamSpecToJson).collect(Collectors.joining(",")); + + int planId = String.format(PLAN_JSON, intermediateStreamJson).hashCode(); + + return String.valueOf(planId); + } + + private String streamSpecToJson(StreamSpec streamSpec) { + return String.format(STREAM_SPEC_JSON_FORMAT, streamSpec.getId(), streamSpec.getId(), streamSpec.getSystemName(), + streamSpec.getPhysicalName()); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala ---------------------------------------------------------------------- diff --cc samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala index dcb06d3,4958a57..81f3ed1 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala @@@ -19,34 -19,16 +19,32 @@@ package org.apache.samza.container + +import java.util +import java.util +import java.util.Collections import java.util.concurrent.ConcurrentHashMap +import com.google.common.collect.Multimap +import org.apache.samza.SamzaException + import org.apache.samza.Partition +import org.apache.samza.checkpoint.OffsetManager +import org.apache.samza.config.Config +import org.apache.samza.config.MapConfig +import org.apache.samza.control.ControlMessageUtils +import org.apache.samza.job.model.ContainerModel +import org.apache.samza.job.model.JobModel +import org.apache.samza.job.model.TaskModel +import org.apache.samza.metrics.Counter +import org.apache.samza.metrics.Metric +import org.apache.samza.metrics.MetricsRegistryMap + import org.apache.samza.checkpoint.{Checkpoint, OffsetManager} + import org.apache.samza.config.{Config, MapConfig} + import org.apache.samza.metrics.{Counter, Metric, MetricsRegistryMap} import org.apache.samza.serializers.SerdeManager - import org.apache.samza.system.IncomingMessageEnvelope - import org.apache.samza.system.SystemConsumer - import org.apache.samza.system.SystemConsumers - import org.apache.samza.system.SystemProducer - import org.apache.samza.system.SystemProducers - import org.apache.samza.system.SystemStream - import org.apache.samza.system.SystemStreamMetadata + import org.apache.samza.storage.TaskStorageManager import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata - import org.apache.samza.system.SystemStreamPartition + import org.apache.samza.system._ import org.apache.samza.system.chooser.RoundRobinChooser import org.apache.samza.task._ import org.junit.Assert._ @@@ -365,34 -350,47 +366,77 @@@ class TestTaskInstance } @Test + def testCommitOrder { + // Simple objects + val partition = new Partition(0) + val taskName = new TaskName("taskName") + val systemStream = new SystemStream("test-system", "test-stream") + val systemStreamPartition = new SystemStreamPartition(systemStream, partition) + val checkpoint = new Checkpoint(Map(systemStreamPartition -> "4").asJava) + + // Mocks + val collector = Mockito.mock(classOf[TaskInstanceCollector]) + val storageManager = Mockito.mock(classOf[TaskStorageManager]) + val offsetManager = Mockito.mock(classOf[OffsetManager]) + when(offsetManager.buildCheckpoint(any())).thenReturn(checkpoint) + val mockOrder = inOrder(offsetManager, collector, storageManager) + + val taskInstance: TaskInstance = new TaskInstance( + Mockito.mock(classOf[StreamTask]).asInstanceOf[StreamTask], + taskName, + new MapConfig, + new TaskInstanceMetrics, + null, + Mockito.mock(classOf[SystemConsumers]), + collector, + Mockito.mock(classOf[SamzaContainerContext]), + offsetManager, + storageManager, + systemStreamPartitions = Set(systemStreamPartition)) + + taskInstance.commit + + // We must first get a snapshot of the checkpoint so it doesn't change while we flush. SAMZA-1384 + mockOrder.verify(offsetManager).buildCheckpoint(taskName) + // Producers must be flushed next and ideally the output would be flushed before the changelog + // s.t. the changelog and checkpoints (state and inputs) are captured last + mockOrder.verify(collector).flush + // Local state is next, to ensure that the state (particularly the offset file) never points to a newer changelog + // offset than what is reflected in the on disk state. + mockOrder.verify(storageManager).flush() + // Finally, checkpoint the inputs with the snapshotted checkpoint captured at the beginning of commit + mockOrder.verify(offsetManager).writeCheckpoint(taskName, checkpoint) + } ++ ++ @Test + def testBuildInputToTasks = { + val system: String = "test-system" + val stream0: String = "test-stream-0" + val stream1: String = "test-stream-1" + + val ssp0: SystemStreamPartition = new SystemStreamPartition(system, stream0, new Partition(0)) + val ssp1: SystemStreamPartition = new SystemStreamPartition(system, stream0, new Partition(1)) + val ssp2: SystemStreamPartition = new SystemStreamPartition(system, stream1, new Partition(0)) + + val task0: TaskName = new TaskName("Task 0") + val task1: TaskName = new TaskName("Task 1") + val ssps: util.Set[SystemStreamPartition] = new util.HashSet[SystemStreamPartition] + ssps.add(ssp0) + ssps.add(ssp2) + val tm0: TaskModel = new TaskModel(task0, ssps, new Partition(0)) + val cm0: ContainerModel = new ContainerModel("c0", 0, Collections.singletonMap(task0, tm0)) + val tm1: TaskModel = new TaskModel(task1, Collections.singleton(ssp1), new Partition(1)) + val cm1: ContainerModel = new ContainerModel("c1", 1, Collections.singletonMap(task1, tm1)) + + val cms: util.Map[String, ContainerModel] = new util.HashMap[String, ContainerModel] + cms.put(cm0.getProcessorId, cm0) + cms.put(cm1.getProcessorId, cm1) + + val jobModel: JobModel = new JobModel(new MapConfig, cms, null) + val streamToTasks: Multimap[SystemStream, String] = TaskInstance.buildInputToTasks(jobModel) + assertEquals(streamToTasks.get(ssp0.getSystemStream).size, 2) + assertEquals(streamToTasks.get(ssp2.getSystemStream).size, 1) + } } class MockSystemAdmin extends SystemAdmin {
