Repository: samza Updated Branches: refs/heads/master adfc4bfc4 -> d806e9dab
Fix test failures caused by PR345 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d806e9da Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d806e9da Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d806e9da Branch: refs/heads/master Commit: d806e9dab12c2299c55fa6e5e95d640f3a008c28 Parents: adfc4bf Author: xiliu <[email protected]> Authored: Tue Nov 7 18:00:03 2017 -0800 Committer: xiliu <[email protected]> Committed: Tue Nov 7 18:00:03 2017 -0800 ---------------------------------------------------------------------- .../org/apache/samza/operators/impl/OperatorImpl.java | 12 +++++++++--- .../samza/operators/impl/TestOperatorImplGraph.java | 10 +++++++++- 2 files changed, 18 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/d806e9da/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index 96dcd89..92a563a 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -111,9 +111,15 @@ public abstract class OperatorImpl<M, RM> { TaskContextImpl taskContext = (TaskContextImpl) context; this.eosStates = (EndOfStreamStates) taskContext.fetchObject(EndOfStreamStates.class.getName()); this.watermarkStates = (WatermarkStates) taskContext.fetchObject(WatermarkStates.class.getName()); - ContainerModel containerModel = taskContext.getJobModel().getContainers() - .get(context.getSamzaContainerContext().id); - this.taskModel = containerModel.getTasks().get(taskName); + + if (taskContext.getJobModel() != null) { + ContainerModel containerModel = taskContext.getJobModel().getContainers() + .get(context.getSamzaContainerContext().id); + this.taskModel = containerModel.getTasks().get(taskName); + } else { + this.taskModel = null; + this.usedInCurrentTask = true; + } handleInit(config, context); http://git-wip-us.apache.org/repos/asf/samza/blob/d806e9da/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java index 47e55a8..3f48cf2 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java @@ -30,6 +30,7 @@ import org.apache.samza.Partition; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; +import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.container.TaskContextImpl; import org.apache.samza.container.TaskName; import org.apache.samza.job.model.ContainerModel; @@ -149,8 +150,15 @@ public class TestOperatorImplGraph { when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); when(mockTaskContext.getTaskName()).thenReturn(new TaskName("task 0")); JobModel jobModel = mock(JobModel.class); - when(jobModel.getContainers()).thenReturn(Collections.EMPTY_MAP); + ContainerModel containerModel = mock(ContainerModel.class); + TaskModel taskModel = mock(TaskModel.class); + when(jobModel.getContainers()).thenReturn(Collections.singletonMap("0", containerModel)); + when(containerModel.getTasks()).thenReturn(Collections.singletonMap(new TaskName("task 0"), taskModel)); + when(taskModel.getSystemStreamPartitions()).thenReturn(Collections.emptySet()); when(mockTaskContext.getJobModel()).thenReturn(jobModel); + SamzaContainerContext containerContext = + new SamzaContainerContext("0", mockConfig, Collections.singleton(new TaskName("task 0"))); + when(mockTaskContext.getSamzaContainerContext()).thenReturn(containerContext); OperatorImplGraph opImplGraph = new OperatorImplGraph(streamGraph, mockConfig, mockTaskContext, mock(Clock.class));
