SAMZA-444; provide a samza job data model for job coordinator
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/6f595bed Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/6f595bed Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/6f595bed Branch: refs/heads/master Commit: 6f595beda2482ab85c47f40ae8345c2591007367 Parents: f6d3415 Author: Chris Riccomini <[email protected]> Authored: Tue Oct 28 09:04:14 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Oct 28 09:04:14 2014 -0700 ---------------------------------------------------------------------- .../apache/samza/job/model/ContainerModel.java | 92 ++++++++ .../org/apache/samza/job/model/JobModel.java | 90 ++++++++ .../org/apache/samza/job/model/TaskModel.java | 108 +++++++++ .../model/JsonContainerModelMixIn.java | 42 ++++ .../serializers/model/JsonJobModelMixIn.java | 41 ++++ .../serializers/model/JsonTaskModelMixIn.java | 45 ++++ .../serializers/model/SamzaObjectMapper.java | 198 ++++++++++++++++ .../samza/checkpoint/CheckpointTool.scala | 9 +- .../serializers/JsonConfigSerializer.scala | 39 ---- .../apache/samza/container/SamzaContainer.scala | 75 ++++--- .../TaskNamesToSystemStreamPartitions.scala | 145 ------------ .../grouper/task/GroupByContainerCount.scala | 47 ++-- .../grouper/task/TaskNameGrouper.scala | 35 ++- .../samza/coordinator/JobCoordinator.scala | 225 +++++++++++++++++++ .../samza/coordinator/server/HttpServer.scala | 57 ++++- .../samza/coordinator/server/JobServlet.scala | 50 +---- .../samza/coordinator/server/ServletBase.scala | 26 ++- .../org/apache/samza/job/local/ProcessJob.scala | 13 +- .../samza/job/local/ProcessJobFactory.scala | 60 +++-- .../samza/job/local/ThreadJobFactory.scala | 30 +-- .../org/apache/samza/util/JsonHelpers.scala | 93 -------- .../main/scala/org/apache/samza/util/Util.scala | 225 ++----------------- .../model/TestSamzaObjectMapper.java | 59 +++++ .../samza/container/TestSamzaContainer.scala | 64 +++--- .../TestTaskNamesToSystemStreamPartitions.scala | 71 ------ .../task/TestGroupByContainerCount.scala | 65 ++++-- .../samza/coordinator/TestJobCoordinator.scala | 119 ++++++++++ .../scala/org/apache/samza/util/TestUtil.scala | 70 +----- .../resources/scalate/WEB-INF/views/index.scaml | 10 +- .../apache/samza/job/yarn/SamzaAppMaster.scala | 7 +- .../samza/job/yarn/SamzaAppMasterService.scala | 12 +- .../samza/job/yarn/SamzaAppMasterState.scala | 7 +- .../job/yarn/SamzaAppMasterTaskManager.scala | 15 +- .../org/apache/samza/job/yarn/YarnJob.scala | 4 +- .../webapp/ApplicationMasterRestServlet.scala | 29 ++- .../job/yarn/TestSamzaAppMasterService.scala | 16 +- .../yarn/TestSamzaAppMasterTaskManager.scala | 36 +-- 37 files changed, 1393 insertions(+), 936 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java new file mode 100644 index 0000000..98a34bc --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.model; + +import java.util.Collections; +import java.util.Map; +import org.apache.samza.container.TaskName; + +/** + * <p> + * The data model is used to define which TaskModels a SamzaContainer should + * process. The model is used in the job coordinator and SamzaContainer to + * determine how to execute Samza jobs. + * </p> + * + * <p> + * The hierarchy for a Samza's job data model is that jobs have containers, and + * containers have tasks. Each data model contains relevant information, such as + * an id, partition information, etc. + * </p> + */ +public class ContainerModel implements Comparable<ContainerModel> { + private final int containerId; + private final Map<TaskName, TaskModel> tasks; + + public ContainerModel(int containerId, Map<TaskName, TaskModel> tasks) { + this.containerId = containerId; + this.tasks = Collections.unmodifiableMap(tasks); + } + + public int getContainerId() { + return containerId; + } + + public Map<TaskName, TaskModel> getTasks() { + return tasks; + } + + @Override + public String toString() { + return "ContainerModel [containerId=" + containerId + ", tasks=" + tasks + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + containerId; + result = prime * result + ((tasks == null) ? 0 : tasks.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ContainerModel other = (ContainerModel) obj; + if (containerId != other.containerId) + return false; + if (tasks == null) { + if (other.tasks != null) + return false; + } else if (!tasks.equals(other.tasks)) + return false; + return true; + } + + public int compareTo(ContainerModel other) { + return containerId - other.getContainerId(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java new file mode 100644 index 0000000..c2b49c4 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.model; + +import java.util.Collections; +import java.util.Map; +import org.apache.samza.config.Config; + +/** + * <p> + * The data model used to represent a Samza job. The model is used in the job + * coordinator and SamzaContainer to determine how to execute Samza jobs. + * </p> + * + * <p> + * The hierarchy for a Samza's job data model is that jobs have containers, and + * containers have tasks. Each data model contains relevant information, such as + * an id, partition information, etc. + * </p> + */ +public class JobModel { + private final Config config; + private final Map<Integer, ContainerModel> containers; + + public JobModel(Config config, Map<Integer, ContainerModel> containers) { + this.config = config; + this.containers = Collections.unmodifiableMap(containers); + } + + public Config getConfig() { + return config; + } + + public Map<Integer, ContainerModel> getContainers() { + return containers; + } + + @Override + public String toString() { + return "JobModel [config=" + config + ", containers=" + containers + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((config == null) ? 0 : config.hashCode()); + result = prime * result + ((containers == null) ? 0 : containers.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + JobModel other = (JobModel) obj; + if (config == null) { + if (other.config != null) + return false; + } else if (!config.equals(other.config)) + return false; + if (containers == null) { + if (other.containers != null) + return false; + } else if (!containers.equals(other.containers)) + return false; + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java new file mode 100644 index 0000000..eb22d2e --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.model; + +import java.util.Collections; +import java.util.Set; +import org.apache.samza.Partition; +import org.apache.samza.container.TaskName; +import org.apache.samza.system.SystemStreamPartition; + +/** + * <p> + * The data model used to represent a task. The model is used in the job + * coordinator and SamzaContainer to determine how to execute Samza jobs. + * </p> + * + * <p> + * The hierarchy for a Samza's job data model is that jobs have containers, and + * containers have tasks. Each data model contains relevant information, such as + * an id, partition information, etc. + * </p> + */ +public class TaskModel implements Comparable<TaskModel> { + private final TaskName taskName; + private final Set<SystemStreamPartition> systemStreamPartitions; + private final Partition changelogPartition; + + public TaskModel(TaskName taskName, Set<SystemStreamPartition> systemStreamPartitions, Partition changelogPartition) { + this.taskName = taskName; + this.systemStreamPartitions = Collections.unmodifiableSet(systemStreamPartitions); + this.changelogPartition = changelogPartition; + } + + public TaskName getTaskName() { + return taskName; + } + + public Set<SystemStreamPartition> getSystemStreamPartitions() { + return systemStreamPartitions; + } + + public Partition getChangelogPartition() { + return changelogPartition; + } + + @Override + public String toString() { + return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitions + ", changeLogPartition=" + changelogPartition + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((changelogPartition == null) ? 0 : changelogPartition.hashCode()); + result = prime * result + ((systemStreamPartitions == null) ? 0 : systemStreamPartitions.hashCode()); + result = prime * result + ((taskName == null) ? 0 : taskName.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + TaskModel other = (TaskModel) obj; + if (changelogPartition == null) { + if (other.changelogPartition != null) + return false; + } else if (!changelogPartition.equals(other.changelogPartition)) + return false; + if (systemStreamPartitions == null) { + if (other.systemStreamPartitions != null) + return false; + } else if (!systemStreamPartitions.equals(other.systemStreamPartitions)) + return false; + if (taskName == null) { + if (other.taskName != null) + return false; + } else if (!taskName.equals(other.taskName)) + return false; + return true; + } + + public int compareTo(TaskModel other) { + return taskName.compareTo(other.getTaskName()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java new file mode 100644 index 0000000..f197a95 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers.model; + +import java.util.Map; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.TaskModel; +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * A mix-in Jackson class to convert Samza's ContainerModel to/from JSON. + */ +public abstract class JsonContainerModelMixIn { + @JsonCreator + public JsonContainerModelMixIn(@JsonProperty("container-id") int containerId, @JsonProperty("tasks") Map<TaskName, TaskModel> tasks) { + } + + @JsonProperty("container-id") + abstract int getContainerId(); + + @JsonProperty("tasks") + abstract Map<TaskName, TaskModel> getTasks(); +} + http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java new file mode 100644 index 0000000..037b5e2 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers.model; + +import java.util.Map; +import org.apache.samza.config.Config; +import org.apache.samza.job.model.ContainerModel; +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * A mix-in Jackson class to convert Samza's JobModel to/from JSON. + */ +public abstract class JsonJobModelMixIn { + @JsonCreator + public JsonJobModelMixIn(@JsonProperty("config") Config config, @JsonProperty("containers") Map<Integer, ContainerModel> containers) { + } + + @JsonProperty("config") + abstract Config getConfig(); + + @JsonProperty("containers") + abstract Map<Integer, ContainerModel> getContainers(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java new file mode 100644 index 0000000..7dc431c --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers.model; + +import java.util.Set; +import org.apache.samza.Partition; +import org.apache.samza.container.TaskName; +import org.apache.samza.system.SystemStreamPartition; +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * A mix-in Jackson class to convert Samza's TaskModel to/from JSON. + */ +public abstract class JsonTaskModelMixIn { + @JsonCreator + public JsonTaskModelMixIn(@JsonProperty("task-name") TaskName taskName, @JsonProperty("system-stream-partitions") Set<SystemStreamPartition> systemStreamPartitions, @JsonProperty("changelog-partition") Partition changelogPartition) { + } + + @JsonProperty("task-name") + abstract TaskName getTaskName(); + + @JsonProperty("system-stream-partitions") + abstract Set<SystemStreamPartition> getSystemStreamPartitions(); + + @JsonProperty("changelog-partition") + abstract Partition getChangelogPartition(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java new file mode 100644 index 0000000..3517912 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers.model; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.job.model.TaskModel; +import org.apache.samza.system.SystemStreamPartition; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonProcessingException; +import org.codehaus.jackson.ObjectCodec; +import org.codehaus.jackson.Version; +import org.codehaus.jackson.map.DeserializationContext; +import org.codehaus.jackson.map.JsonDeserializer; +import org.codehaus.jackson.map.JsonSerializer; +import org.codehaus.jackson.map.MapperConfig; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.PropertyNamingStrategy; +import org.codehaus.jackson.map.SerializerProvider; +import org.codehaus.jackson.map.introspect.AnnotatedField; +import org.codehaus.jackson.map.introspect.AnnotatedMethod; +import org.codehaus.jackson.map.module.SimpleModule; +import org.codehaus.jackson.type.TypeReference; + +/** + * <p> + * A collection of utility classes and (de)serializers to make Samza's job model + * work with Jackson. Rather than annotating Samza's job model directly, the + * Jackson-specific code is isolated so that Samza's core data model does not + * require a direct dependency on Jackson. + * </p> + * + * <p> + * To use Samza's job data model, use the SamzaObjectMapper.getObjectMapper() + * method. + * </p> + */ +public class SamzaObjectMapper { + private static final ObjectMapper OBJECT_MAPPER = getObjectMapper(); + + /** + * @return Returns a new ObjectMapper that's been configured to (de)serialize + * Samza's job data model, and simple data types such as TaskName, + * Partition, Config, and SystemStreamPartition. + */ + public static ObjectMapper getObjectMapper() { + ObjectMapper mapper = new ObjectMapper(); + SimpleModule module = new SimpleModule("SamzaModule", new Version(1, 0, 0, "")); + + // Setup custom serdes for simple data types. + module.addSerializer(Partition.class, new PartitionSerializer()); + module.addSerializer(SystemStreamPartition.class, new SystemStreamPartitionSerializer()); + module.addSerializer(TaskName.class, new TaskNameSerializer()); + module.addDeserializer(Partition.class, new PartitionDeserializer()); + module.addDeserializer(SystemStreamPartition.class, new SystemStreamPartitionDeserializer()); + module.addDeserializer(Config.class, new ConfigDeserializer()); + + // Setup mixins for data models. + mapper.getSerializationConfig().addMixInAnnotations(TaskModel.class, JsonTaskModelMixIn.class); + mapper.getDeserializationConfig().addMixInAnnotations(TaskModel.class, JsonTaskModelMixIn.class); + mapper.getSerializationConfig().addMixInAnnotations(ContainerModel.class, JsonContainerModelMixIn.class); + mapper.getDeserializationConfig().addMixInAnnotations(ContainerModel.class, JsonContainerModelMixIn.class); + mapper.getSerializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class); + mapper.getDeserializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class); + + // Convert camel case to hyphenated field names, and register the module. + mapper.setPropertyNamingStrategy(new CamelCaseToDashesStrategy()); + mapper.registerModule(module); + + return mapper; + } + + public static class ConfigDeserializer extends JsonDeserializer<Config> { + @Override + public Config deserialize(JsonParser jsonParser, DeserializationContext context) throws IOException, JsonProcessingException { + ObjectCodec oc = jsonParser.getCodec(); + JsonNode node = oc.readTree(jsonParser); + return new MapConfig(OBJECT_MAPPER.<Map<String, String>> readValue(node, new TypeReference<Map<String, String>>() { + })); + } + } + + public static class PartitionSerializer extends JsonSerializer<Partition> { + @Override + public void serialize(Partition partition, JsonGenerator jsonGenerator, SerializerProvider provider) throws IOException, JsonProcessingException { + jsonGenerator.writeObject(Integer.valueOf(partition.getPartitionId())); + } + } + + public static class PartitionDeserializer extends JsonDeserializer<Partition> { + @Override + public Partition deserialize(JsonParser jsonParser, DeserializationContext context) throws IOException, JsonProcessingException { + ObjectCodec oc = jsonParser.getCodec(); + JsonNode node = oc.readTree(jsonParser); + return new Partition(node.getIntValue()); + } + } + + public static class TaskNameSerializer extends JsonSerializer<TaskName> { + @Override + public void serialize(TaskName taskName, JsonGenerator jsonGenerator, SerializerProvider provider) throws IOException, JsonProcessingException { + jsonGenerator.writeObject(taskName.toString()); + } + } + + public static class TaskNameDeserializer extends JsonDeserializer<TaskName> { + @Override + public TaskName deserialize(JsonParser jsonParser, DeserializationContext context) throws IOException, JsonProcessingException { + ObjectCodec oc = jsonParser.getCodec(); + JsonNode node = oc.readTree(jsonParser); + return new TaskName(node.getTextValue()); + } + } + + public static class SystemStreamPartitionSerializer extends JsonSerializer<SystemStreamPartition> { + @Override + public void serialize(SystemStreamPartition systemStreamPartition, JsonGenerator jsonGenerator, SerializerProvider provider) throws IOException, JsonProcessingException { + Map<String, Object> systemStreamPartitionMap = new HashMap<String, Object>(); + systemStreamPartitionMap.put("system", systemStreamPartition.getSystem()); + systemStreamPartitionMap.put("stream", systemStreamPartition.getStream()); + systemStreamPartitionMap.put("partition", systemStreamPartition.getPartition()); + jsonGenerator.writeObject(systemStreamPartitionMap); + } + } + + public static class SystemStreamPartitionDeserializer extends JsonDeserializer<SystemStreamPartition> { + @Override + public SystemStreamPartition deserialize(JsonParser jsonParser, DeserializationContext context) throws IOException, JsonProcessingException { + ObjectCodec oc = jsonParser.getCodec(); + JsonNode node = oc.readTree(jsonParser); + String system = node.get("system").getTextValue(); + String stream = node.get("stream").getTextValue(); + Partition partition = new Partition(node.get("partition").getIntValue()); + return new SystemStreamPartition(system, stream, partition); + } + } + + /** + * A Jackson property naming strategy that converts camel case JSON fields to + * hyphenated names. For example, myVariableName would be converted to + * my-variable-name. + */ + public static class CamelCaseToDashesStrategy extends PropertyNamingStrategy { + @Override + public String nameForField(MapperConfig<?> config, AnnotatedField field, String defaultName) { + return convert(defaultName); + } + + @Override + public String nameForGetterMethod(MapperConfig<?> config, AnnotatedMethod method, String defaultName) { + return convert(defaultName); + } + + @Override + public String nameForSetterMethod(MapperConfig<?> config, AnnotatedMethod method, String defaultName) { + return convert(defaultName); + } + + public String convert(String defaultName) { + StringBuilder builder = new StringBuilder(); + char[] arr = defaultName.toCharArray(); + for (int i = 0; i < arr.length; ++i) { + if (Character.isUpperCase(arr[i])) { + builder.append("-" + Character.toLowerCase(arr[i])); + } else { + builder.append(arr[i]); + } + } + return builder.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala index 64a5078..ddc30af 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala @@ -32,6 +32,7 @@ import org.apache.samza.util.{CommandLine, Util} import org.apache.samza.{Partition, SamzaException} import scala.collection.JavaConversions._ import org.apache.samza.util.Logging +import org.apache.samza.coordinator.JobCoordinator /** * Command-line tool for inspecting and manipulating the checkpoints for a job. @@ -136,7 +137,13 @@ class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap) extend info("Using %s" format manager) // Find all the TaskNames that would be generated for this job config - val taskNames = Util.assignContainerToSSPTaskNames(config, 1).get(0).get.keys.toSet + val coordinator = JobCoordinator(config, 1) + val taskNames = coordinator + .jobModel + .getContainers + .values + .flatMap(_.getTasks.keys) + .toSet taskNames.foreach(manager.register) manager.start http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala b/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala deleted file mode 100644 index 60e65ea..0000000 --- a/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.config.serializers -import scala.collection.JavaConversions._ - -import org.codehaus.jackson.map.ObjectMapper - -import org.apache.samza.config.Config -import org.apache.samza.config.MapConfig - -import java.util.HashMap - -object JsonConfigSerializer { - val jsonMapper = new ObjectMapper() - - def fromJson(string: String): Config = { - val map = jsonMapper.readValue(string, classOf[HashMap[String, String]]) - new MapConfig(map) - } - - def toJson(config: Config) = jsonMapper.writeValueAsString(new HashMap[String, String](config)) -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index d0c9004..5885a88 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -31,7 +31,6 @@ import org.apache.samza.config.StorageConfig.Config2Storage import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.config.TaskConfig.Config2Task -import org.apache.samza.config.serializers.JsonConfigSerializer import org.apache.samza.metrics.JmxServer import org.apache.samza.metrics.JvmMetrics import org.apache.samza.metrics.MetricsRegistryMap @@ -60,9 +59,12 @@ import org.apache.samza.task.TaskLifecycleListenerFactory import org.apache.samza.util.Logging import org.apache.samza.util.Util import scala.collection.JavaConversions._ -import org.apache.samza.util.JsonHelpers import java.net.URL import org.apache.samza.coordinator.server.JobServlet +import org.apache.samza.job.model.ContainerModel +import org.apache.samza.coordinator.JobCoordinator +import org.apache.samza.serializers.model.SamzaObjectMapper +import org.apache.samza.job.model.JobModel object SamzaContainer extends Logging { def main(args: Array[String]) { @@ -76,9 +78,11 @@ object SamzaContainer extends Logging { try { val containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID).toInt val coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL) - val (config, sspTaskNames, taskNameToChangeLogPartitionMapping) = getCoordinatorObjects(coordinatorUrl) + val jobModel = readJobModel(coordinatorUrl) + val containerModel = jobModel.getContainers()(containerId.toInt) + val config = jobModel.getConfig - SamzaContainer(containerId, sspTaskNames(containerId), taskNameToChangeLogPartitionMapping, config).run + SamzaContainer(containerModel, config).run } finally { jmxServer.stop } @@ -89,34 +93,41 @@ object SamzaContainer extends Logging { * assignments, and returns objects to be used for SamzaContainer's * constructor. */ - def getCoordinatorObjects(coordinatorUrl: String) = { - info("Fetching configuration from: %s" format coordinatorUrl) - val rawCoordinatorObjects = JsonHelpers.deserializeCoordinatorBody(Util.read(new URL(coordinatorUrl))) - val rawConfig = rawCoordinatorObjects.get(JobServlet.CONFIG).asInstanceOf[java.util.Map[String, String]] - val rawContainers = rawCoordinatorObjects.get(JobServlet.CONTAINERS).asInstanceOf[java.util.Map[String, java.util.Map[String, java.util.List[java.util.Map[String, Object]]]]] - val rawTaskChangelogMapping = rawCoordinatorObjects.get(JobServlet.TASK_CHANGELOG_MAPPING).asInstanceOf[java.util.Map[String, java.lang.Integer]] - val config = JsonHelpers.convertCoordinatorConfig(rawConfig) - val sspTaskNames = JsonHelpers.convertCoordinatorSSPTaskNames(rawContainers) - val taskNameToChangeLogPartitionMapping = JsonHelpers.convertCoordinatorTaskNameChangelogPartitions(rawTaskChangelogMapping) - (config, sspTaskNames, taskNameToChangeLogPartitionMapping) + def readJobModel(url: String) = { + info("Fetching configuration from: %s" format url) + SamzaObjectMapper + .getObjectMapper + .readValue(Util.read(new URL(url)), classOf[JobModel]) } - def apply(containerId: Int, sspTaskNames: TaskNamesToSystemStreamPartitions, taskNameToChangeLogPartitionMapping: Map[TaskName, Int], config: Config) = { + def apply(containerModel: ContainerModel, config: Config) = { + val containerId = containerModel.getContainerId val containerName = "samza-container-%s" format containerId val containerPID = Util.getContainerPID info("Setting up Samza container: %s" format containerName) info("Samza container PID: %s" format containerPID) info("Using configuration: %s" format config) - info("Using tasks: %s" format sspTaskNames) - info("Using task changelogs: %s" format taskNameToChangeLogPartitionMapping) + info("Using container model: %s" format containerModel) val registry = new MetricsRegistryMap(containerName) val samzaContainerMetrics = new SamzaContainerMetrics(containerName, registry) val systemProducersMetrics = new SystemProducersMetrics(registry) val systemConsumersMetrics = new SystemConsumersMetrics(registry) - val inputSystems = sspTaskNames.getAllSystems() + val inputSystemStreamPartitions = containerModel + .getTasks + .values + .flatMap(_.getSystemStreamPartitions) + .toSet + + val inputSystemStreams = inputSystemStreamPartitions + .map(_.getSystemStream) + .toSet + + val inputSystems = inputSystemStreams + .map(_.getSystem) + .toSet val systemNames = config.getSystemNames @@ -144,7 +155,7 @@ object SamzaContainer extends Logging { info("Got system factories: %s" format systemFactories.keys) val streamMetadataCache = new StreamMetadataCache(systemAdmins) - val inputStreamMetadata = streamMetadataCache.getStreamMetadata(sspTaskNames.getAllSystemStreams) + val inputStreamMetadata = streamMetadataCache.getStreamMetadata(inputSystemStreams) info("Got input stream metadata: %s" format inputStreamMetadata) @@ -213,7 +224,7 @@ object SamzaContainer extends Logging { * A Helper function to build a Map[SystemStream, Serde] for streams defined in the config. This is useful to build both key and message serde maps. */ val buildSystemStreamSerdeMap = (getSerdeName: (SystemStream) => Option[String]) => { - (serdeStreams ++ sspTaskNames.getAllSSPs()) + (serdeStreams ++ inputSystemStreamPartitions) .filter(systemStream => getSerdeName(systemStream).isDefined) .map(systemStream => { val serdeName = getSerdeName(systemStream).get @@ -380,12 +391,18 @@ object SamzaContainer extends Logging { // Wire up all task-instance-level (unshared) objects. - val taskNames = sspTaskNames.keys.toSet + val taskNames = containerModel + .getTasks + .values + .map(_.getTaskName) + .toSet val containerContext = new SamzaContainerContext(containerId, config, taskNames) - val taskInstances: Map[TaskName, TaskInstance] = taskNames.map(taskName => { - debug("Setting up task instance: %s" format taskName) + val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => { + debug("Setting up task instance: %s" format taskModel) + + val taskName = taskModel.getTaskName val task = Util.getObj[StreamTask](taskClassName) @@ -404,13 +421,11 @@ object SamzaContainer extends Logging { info("Got store consumers: %s" format storeConsumers) - val partitionForThisTaskName = new Partition(taskNameToChangeLogPartitionMapping(taskName)) - val taskStores = storageEngineFactories .map { case (storeName, storageEngineFactory) => val changeLogSystemStreamPartition = if (changeLogSystemStreams.contains(storeName)) { - new SystemStreamPartition(changeLogSystemStreams(storeName), partitionForThisTaskName) + new SystemStreamPartition(changeLogSystemStreams(storeName), taskModel.getChangelogPartition) } else { null } @@ -437,7 +452,7 @@ object SamzaContainer extends Logging { info("Got task stores: %s" format taskStores) - val changeLogOldestOffsets = getChangeLogOldestOffsetsForPartition(partitionForThisTaskName, changeLogMetadata) + val changeLogOldestOffsets = getChangeLogOldestOffsetsForPartition(taskModel.getChangelogPartition, changeLogMetadata) info("Assigning oldest change log offsets for taskName %s: %s" format (taskName, changeLogOldestOffsets)) @@ -448,9 +463,11 @@ object SamzaContainer extends Logging { changeLogSystemStreams = changeLogSystemStreams, changeLogOldestOffsets = changeLogOldestOffsets, storeBaseDir = storeBaseDir, - partitionForThisTaskName) + partition = taskModel.getChangelogPartition) - val systemStreamPartitions: Set[SystemStreamPartition] = sspTaskNames.getOrElse(taskName, throw new SamzaException("Can't find taskName " + taskName + " in map of SystemStreamPartitions: " + sspTaskNames)) + val systemStreamPartitions = taskModel + .getSystemStreamPartitions + .toSet info("Retrieved SystemStreamPartitions " + systemStreamPartitions + " for " + taskName) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala deleted file mode 100644 index da15346..0000000 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.container - -import org.apache.samza.util.Logging -import org.apache.samza.SamzaException -import org.apache.samza.system.{SystemStream, SystemStreamPartition} -import scala.collection.{immutable, Map, MapLike} - -/** - * Map of {@link TaskName} to its set of {@link SystemStreamPartition}s with additional methods for aggregating - * those SystemStreamPartitions' individual system, streams and partitions. Is useful for highlighting this - * particular, heavily used map within the code. - * - * @param m Original map of TaskNames to SystemStreamPartitions - */ -class TaskNamesToSystemStreamPartitions(m:Map[TaskName, Set[SystemStreamPartition]] = Map[TaskName, Set[SystemStreamPartition]]()) - extends Map[TaskName, Set[SystemStreamPartition]] - with MapLike[TaskName, Set[SystemStreamPartition], TaskNamesToSystemStreamPartitions] with Logging { - - // Constructor - validate - - // Methods - - // TODO: Get rid of public constructor, rely entirely on the companion object - override def -(key: TaskName): TaskNamesToSystemStreamPartitions = new TaskNamesToSystemStreamPartitions(m - key) - - override def +[B1 >: Set[SystemStreamPartition]](kv: (TaskName, B1)): Map[TaskName, B1] = new TaskNamesToSystemStreamPartitions(m + kv.asInstanceOf[(TaskName, Set[SystemStreamPartition])]) - - override def iterator: Iterator[(TaskName, Set[SystemStreamPartition])] = m.iterator - - override def get(key: TaskName): Option[Set[SystemStreamPartition]] = m.get(key) - - override def empty: TaskNamesToSystemStreamPartitions = new TaskNamesToSystemStreamPartitions() - - override def seq: Map[TaskName, Set[SystemStreamPartition]] = m.seq - - override def foreach[U](f: ((TaskName, Set[SystemStreamPartition])) => U): Unit = m.foreach(f) - - override def size: Int = m.size - - /** - * Validate that this is a legal mapping of TaskNames to SystemStreamPartitions. At the moment, - * we only check that an SSP is included in the mapping at most once. We could add other, - * pluggable validations here, or if we decided to allow an SSP to appear in the mapping more than - * once, remove this limitation. - */ - def validate():Unit = { - // Convert sets of SSPs to lists, to preserve duplicates - val allSSPs: List[SystemStreamPartition] = m.values.toList.map(_.toList).flatten - val sspCountMap = allSSPs.groupBy(ssp => ssp) // Group all the SSPs together - .map(ssp => (ssp._1 -> ssp._2.size)) // Turn into map -> count of that SSP - .filter(ssp => ssp._2 != 1) // Filter out those that appear once - - if(!sspCountMap.isEmpty) { - throw new SamzaException("Assigning the same SystemStreamPartition to multiple TaskNames is not currently supported." + - " Out of compliance SystemStreamPartitions and counts: " + sspCountMap) - } - - debug("Successfully validated TaskName to SystemStreamPartition set mapping:" + m) - } - - /** - * Return a set of all the SystemStreamPartitions for all the keys. - * - * @return All SystemStreamPartitions within this map - */ - def getAllSSPs(): Iterable[SystemStreamPartition] = m.values.flatten - - /** - * Return a set of all the Systems presents in the SystemStreamPartitions across all the keys - * - * @return All Systems within this map - */ - def getAllSystems(): Set[String] = getAllSSPs.map(_.getSystemStream.getSystem).toSet - - /** - * Return a set of all the Partition IDs in the SystemStreamPartitions across all the keys - * - * @return All Partition IDs within this map - */ - def getAllPartitionIds(): Set[Int] = getAllSSPs.map(_.getPartition.getPartitionId).toSet - - /** - * Return a set of all the Streams in the SystemStreamPartitions across all the keys - * - * @return All Streams within this map - */ - def getAllStreams(): Set[String] = getAllSSPs.map(_.getSystemStream.getStream).toSet - - /** - * Return a set of all the SystemStreams in the SystemStreamPartitions across all the keys - * - * @return All SystemStreams within this map - */ - def getAllSystemStreams: Set[SystemStream] = getAllSSPs().map(_.getSystemStream).toSet - - // CommandBuilder needs to get a copy of this map and is a Java interface, therefore we can't just go straight - // from this type to JSON (for passing into the command option. - // Not super crazy about having the Java -> Scala and Scala -> Java methods in two different (but close) places: - // here and in the apply method on the companion object. May be better to just have a conversion util, but would - // be less clean. Life is cruel on the border of Scalapolis and Javatown. - def getJavaFriendlyType: java.util.Map[TaskName, java.util.Set[SystemStreamPartition]] = { - import scala.collection.JavaConverters._ - - m.map({case(k,v) => k -> v.asJava}).toMap.asJava - } -} - -object TaskNamesToSystemStreamPartitions { - def apply() = new TaskNamesToSystemStreamPartitions() - - def apply(m: Map[TaskName, Set[SystemStreamPartition]]) = new TaskNamesToSystemStreamPartitions(m) - - /** - * Convert from Java-happy type we obtain from the SSPTaskName factory - * - * @param m Java version of a map of sets of strings - * @return Populated SSPTaskName map - */ - def apply(m: java.util.Map[TaskName, java.util.Set[SystemStreamPartition]]) = { - import scala.collection.JavaConversions._ - - val rightType: immutable.Map[TaskName, Set[SystemStreamPartition]] = m.map({case(k,v) => k -> v.toSet}).toMap - - new TaskNamesToSystemStreamPartitions(rightType) - } -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala index 7a3ba46..8071fec 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala @@ -16,35 +16,42 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.samza.container.grouper.task -import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions} +import org.apache.samza.container.TaskName +import org.apache.samza.job.model.TaskModel +import org.apache.samza.job.model.ContainerModel import org.apache.samza.system.SystemStreamPartition +import scala.collection.JavaConversions._ /** - * Group the SSP taskNames by dividing the number of taskNames into the number of containers (n) and assigning n taskNames - * to each container as returned by iterating over the keys in the map of taskNames (whatever that ordering happens to be). - * No consideration is given towards locality, even distribution of aggregate SSPs within a container, even distribution - * of the number of taskNames between containers, etc. + * Group the SSP taskNames by dividing the number of taskNames into the number + * of containers (n) and assigning n taskNames to each container as returned by + * iterating over the keys in the map of taskNames (whatever that ordering + * happens to be). No consideration is given towards locality, even distribution + * of aggregate SSPs within a container, even distribution of the number of + * taskNames between containers, etc. */ -class GroupByContainerCount(numContainers:Int) extends TaskNameGrouper { +class GroupByContainerCount(numContainers: Int) extends TaskNameGrouper { require(numContainers > 0, "Must have at least one container") - override def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions] = { - val keySize = taskNames.keySet.size - require(keySize > 0, "Must have some SSPs to group, but found none") - - // Iterate through the taskNames, round-robining them per container - val byContainerNum = (0 until numContainers).map(_ -> scala.collection.mutable.Map[TaskName, Set[SystemStreamPartition]]()).toMap - var idx = 0 - for(taskName <- taskNames.iterator) { - val currMap = byContainerNum.get(idx).get // safe to use simple get since we populated everybody above - idx = (idx + 1) % numContainers - - currMap += taskName - } + override def group(tasks: Set[TaskModel]): Set[ContainerModel] = { + require(tasks.size > 0, "No tasks found. Likely due to no input partitions. Can't run a job with no tasks.") + require(tasks.size >= numContainers, "Your container count (%s) is larger than your task count (%s). Can't have containers with nothing to do, so aborting." format (numContainers, tasks.size)) - byContainerNum.map(kv => kv._1 -> TaskNamesToSystemStreamPartitions(kv._2)).toMap + tasks + .toList + // Sort tasks by taskName. + .sortWith { case (task1, task2) => task1.compareTo(task2) < 0 } + // Assign every task an ID. + .zip(0 until tasks.size) + // Map every task to a container using its task ID. + .groupBy(_._2 % numContainers) + // Take just TaskModel and remove task IDs. + .mapValues(_.map { case (task, taskId) => (task.getTaskName, task) }.toMap) + .map { case (containerId, tasks) => new ContainerModel(containerId, tasks) } + .toSet } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala index 46e75b1..62e94ea 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala @@ -18,23 +18,34 @@ */ package org.apache.samza.container.grouper.task -import org.apache.samza.container.TaskNamesToSystemStreamPartitions +import org.apache.samza.job.model.TaskModel +import org.apache.samza.job.model.ContainerModel /** - * After the input SystemStreamPartitions have been mapped to their TaskNames by an implementation of - * {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}, we can then map those groupings onto - * the {@link org.apache.samza.container.SamzaContainer}s on which they will run. This class takes - * those groupings-of-SSPs and groups them together on which container each should run on. A simple - * implementation could assign each TaskNamesToSystemStreamPartition to a separate container. More - * advanced implementations could examine the TaskNamesToSystemStreamPartition to group by them - * by data locality, anti-affinity, even distribution of expected bandwidth consumption, etc. + * <p> + * After the input SystemStreamPartitions have been mapped to their tasks by an + * implementation of + * {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper} + * , we can then map those groupings into the + * {@link org.apache.samza.container.SamzaContainer}s on which they will run. + * This class takes a set of TaskModels and groups them together into + * ContainerModels. All tasks within a single ContainerModel will be executed in + * a single SamzaContainer. + * </p> + * + * <p> + * A simple implementation could assign each TaskModel to a separate container. + * More advanced implementations could examine the TaskModel to group them by + * data locality, anti-affinity, even distribution of expected bandwidth + * consumption, etc. + * </p> */ trait TaskNameGrouper { /** - * Group TaskNamesToSystemStreamPartitions onto the containers they will share + * Group tasks into the containers they will share. * - * @param taskNames Pre-grouped SSPs - * @return Mapping of container ID to set if TaskNames it will run + * @param tasks Set of tasks to group into containers. + * @return Set of containers, which contain the tasks that were passed in. */ - def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions] + def group(tasks: Set[TaskModel]): Set[ContainerModel] } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala new file mode 100644 index 0000000..c14f2f6 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator + +import org.apache.samza.config.Config +import org.apache.samza.job.model.JobModel +import org.apache.samza.SamzaException +import org.apache.samza.container.grouper.task.GroupByContainerCount +import org.apache.samza.util.Util +import org.apache.samza.checkpoint.CheckpointManagerFactory +import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory +import java.util +import org.apache.samza.container.TaskName +import org.apache.samza.util.Logging +import org.apache.samza.config.TaskConfig.Config2Task +import org.apache.samza.metrics.MetricsRegistryMap +import org.apache.samza.config.StorageConfig.Config2Storage +import scala.collection.JavaConversions._ +import org.apache.samza.config.JobConfig.Config2Job +import org.apache.samza.config.TaskConfig.Config2Task +import org.apache.samza.config.SystemConfig.Config2System +import org.apache.samza.Partition +import org.apache.samza.job.model.TaskModel +import org.apache.samza.system.StreamMetadataCache +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.serializers.model.SamzaObjectMapper +import java.net.URL +import org.apache.samza.system.SystemFactory +import org.apache.samza.coordinator.server.HttpServer +import org.apache.samza.checkpoint.CheckpointManager +import org.apache.samza.coordinator.server.JobServlet + +object JobCoordinator extends Logging { + /** + * Build a JobCoordinator using a Samza job's configuration. + */ + def apply(config: Config, containerCount: Int) = { + val jobModel = buildJobModel(config, containerCount) + val server = new HttpServer + server.addServlet("/*", new JobServlet(jobModel)) + new JobCoordinator(jobModel, server) + } + + /** + * Gets a CheckpointManager from the configuration. + */ + def getCheckpointManager(config: Config) = { + config.getCheckpointManagerFactory match { + case Some(checkpointFactoryClassName) => + Util + .getObj[CheckpointManagerFactory](checkpointFactoryClassName) + .getCheckpointManager(config, new MetricsRegistryMap) + case _ => + if (!config.getStoreNames.isEmpty) { + throw new SamzaException("Storage factories configured, but no checkpoint manager has been specified. " + + "Unable to start job as there would be no place to store changelog partition mapping.") + } + null + } + } + + /** + * For each input stream specified in config, exactly determine its + * partitions, returning a set of SystemStreamPartitions containing them all. + */ + def getInputStreamPartitions(config: Config) = { + val inputSystemStreams = config.getInputStreams + val systemNames = config.getSystemNames.toSet + + // Map the name of each system to the corresponding SystemAdmin + val systemAdmins = systemNames.map(systemName => { + val systemFactoryClassName = config + .getSystemFactory(systemName) + .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName)) + val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName) + systemName -> systemFactory.getAdmin(systemName, config) + }).toMap + + // Get the set of partitions for each SystemStream from the stream metadata + new StreamMetadataCache(systemAdmins) + .getStreamMetadata(inputSystemStreams) + .flatMap { + case (systemStream, metadata) => + metadata + .getSystemStreamPartitionMetadata + .keys + .map(new SystemStreamPartition(systemStream, _)) + }.toSet + } + + /** + * Gets a SystemStreamPartitionGrouper object from the configuration. + */ + def getSystemStreamPartitionGrouper(config: Config) = { + val factoryString = config.getSystemStreamPartitionGrouperFactory + val factory = Util.getObj[SystemStreamPartitionGrouperFactory](factoryString) + factory.getSystemStreamPartitionGrouper(config) + } + + /** + * Build a full Samza job model using the job configuration. + */ + def buildJobModel(config: Config, containerCount: Int) = { + // TODO containerCount should go away when we generalize the job coordinator, + // and have a non-yarn-specific way of specifying container count. + val checkpointManager = getCheckpointManager(config) + val allSystemStreamPartitions = getInputStreamPartitions(config) + val grouper = getSystemStreamPartitionGrouper(config) + val previousChangelogeMapping = if (checkpointManager != null) { + checkpointManager.start + checkpointManager.readChangeLogPartitionMapping + } else { + new util.HashMap[TaskName, java.lang.Integer]() + } + var maxChangelogPartitionId = previousChangelogeMapping + .values + .map(_.toInt) + .toList + .sorted + .lastOption + .getOrElse(-1) + + // Assign all SystemStreamPartitions to TaskNames. + val taskModels = { + val groups = grouper.group(allSystemStreamPartitions) + info("SystemStreamPartitionGrouper " + grouper + " has grouped the SystemStreamPartitions into the following taskNames:") + groups + .map { + case (taskName, systemStreamPartitions) => + val changelogPartition = Option(previousChangelogeMapping.get(taskName)) match { + case Some(changelogPartitionId) => new Partition(changelogPartitionId) + case _ => + // If we've never seen this TaskName before, then assign it a + // new changelog. + maxChangelogPartitionId += 1 + info("New task %s is being assigned changelog partition %s." format (taskName, maxChangelogPartitionId)) + new Partition(maxChangelogPartitionId) + } + new TaskModel(taskName, systemStreamPartitions, changelogPartition) + } + .toSet + } + + // Save the changelog mapping back to the checkpoint manager. + if (checkpointManager != null) { + // newChangelogMapping is the merging of all current task:changelog + // assignments with whatever we had before (previousChangelogeMapping). + // We must persist legacy changelog assignments so that + // maxChangelogPartitionId always has the absolute max, not the current + // max (in case the task with the highest changelog partition mapping + // disappears. + val newChangelogMapping = taskModels.map(taskModel => { + taskModel.getTaskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId) + }).toMap ++ previousChangelogeMapping + info("Saving task-to-changelog partition mapping: %s" format newChangelogMapping) + checkpointManager.writeChangeLogPartitionMapping(newChangelogMapping) + checkpointManager.stop + } + + // Here is where we should put in a pluggable option for the + // SSPTaskNameGrouper for locality, load-balancing, etc. + val containerGrouper = new GroupByContainerCount(containerCount) + val containerModels = containerGrouper + .group(taskModels) + .map { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel } + .toMap + + new JobModel(config, containerModels) + } +} + +/** + * <p>JobCoordinator is responsible for managing the lifecycle of a Samza job + * once it's been started. This includes starting and stopping containers, + * managing configuration, etc.</p> + * + * <p>Any new cluster manager that's integrated with Samza (YARN, Mesos, etc) + * must integrate with the job coordinator.</p> + * + * <p>This class' API is currently unstable, and likely to change. The + * coordinator's responsibility is simply to propagate the job model, and HTTP + * server right now.</p> + */ +class JobCoordinator( + /** + * The data model that describes the Samza job's containers and tasks. + */ + val jobModel: JobModel, + + /** + * HTTP server used to serve a Samza job's container model to SamzaContainers when they start up. + */ + val server: HttpServer) extends Logging { + + debug("Got job model: %s." format jobModel) + + def start { + debug("Starting HTTP server.") + server.start + info("Startd HTTP server: %s" format server.getUrl) + } + + def stop { + debug("Stopping HTTP server.") + server.stop + info("Stopped HTTP server.") + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala index 7c0676c..10986a4 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala @@ -32,23 +32,64 @@ import org.eclipse.jetty.servlet.ServletHolder import java.net.URL import org.apache.samza.util.Logging +/** + * <p>A Jetty-based HTTP server. The server allows arbitrary servlets to be added + * with the addServlet() method. The server is configured to automatically + * serve static CSS and JS from the /css and /js directories if a + * resourceBasePath is specified.</p> + */ class HttpServer( + /** + * All servlet paths will be served out of the rootPath. If rootPath is set + * to /foo, then all servlet paths will be served underneath /foo. + */ rootPath: String = "/", + + /** + * The port that Jetty should bind to. If set to 0, Jetty will bind to a + * dynamically allocated free port on the machine it's running on. The port + * can be retrieved by calling .getUrl. + */ port: Int = 0, + + /** + * If specified, tells Jetty where static resources are located inside + * WEB-INF. This allows HttpServer to serve arbitrary static files that are + * embedded in a JAR. + */ resourceBasePath: String = null, + + /** + * The SevletHolder to use for static file (CSS/JS) serving. + */ defaultHolder: ServletHolder = new ServletHolder(classOf[DefaultServlet])) extends Logging { + var running = false var servlets = Map[String, Servlet]() val server = new Server(port) val context = new ServletContextHandler(ServletContextHandler.SESSIONS) defaultHolder.setName("default") + /** + * <p> + * Add a servlet to the Jetty container. Path can be wild-carded (e.g. /\* + * or /foo/\*), and is relative to the rootPath specified in the constructor. + * </p> + * + * <p> + * Servlets with path /bar/\* and rootPath /foo will result in a location of + * http://localhost/foo/bar. + * </p> + */ def addServlet(path: String, servlet: Servlet) { debug("Adding servlet %s to path %s" format (servlet, path)) servlets += path -> servlet } + /** + * Start the Jetty server, and begin serving content. + */ def start { debug("Starting server with rootPath=%s port=%s resourceBasePath=%s" format (rootPath, port, resourceBasePath)) context.setContextPath(rootPath) @@ -70,18 +111,30 @@ class HttpServer( debug("Starting HttpServer.") server.start() + running = true info("Started HttpServer on: %s" format getUrl) } + /** + * Shutdown the Jetty server. + */ def stop { + running = false debug("Stopping server") context.stop() server.stop() info("Stopped server") } + /** + * Returns the URL for the root of the HTTP server. This method + */ def getUrl = { - val runningPort = server.getConnectors()(0).asInstanceOf[Connector].getLocalPort() - new URL("http://" + InetAddress.getLocalHost().getHostAddress() + ":" + runningPort + rootPath) + if (running) { + val runningPort = server.getConnectors()(0).asInstanceOf[Connector].getLocalPort() + new URL("http://" + InetAddress.getLocalHost().getHostAddress() + ":" + runningPort + rootPath) + } else { + throw new SamzaException("HttpServer is not currently running, so URLs are not available for it.") + } } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala index d7841a6..635c353 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala @@ -19,50 +19,12 @@ package org.apache.samza.coordinator.server -import org.apache.samza.config.Config -import java.util.HashMap -import org.apache.samza.container.TaskNamesToSystemStreamPartitions -import org.apache.samza.util.JsonHelpers -import org.apache.samza.container.TaskName +import org.apache.samza.job.model.JobModel import org.apache.samza.util.Logging -object JobServlet { - val CONFIG = "config" - val CONTAINERS = "containers" - val TASK_CHANGELOG_MAPPING = "task-changelog-mappings" -} - -class JobServlet( - config: Config, - containerToTaskMapping: Map[Int, TaskNamesToSystemStreamPartitions], - taskToChangelogMapping: Map[TaskName, Int]) extends ServletBase with Logging { - import JobServlet._ - import JsonHelpers._ - - val javaSafeContainerToTaskMapping = buildTasksToSSPs - val javaSafeTaskToChangelogMappings = convertTaskNameToChangeLogPartitionMapping(taskToChangelogMapping) - val jsonMap = buildJsonMap - - debug("Built JSON map: %s" format jsonMap) - - protected def getObjectToWrite() = { - jsonMap - } - - private def buildTasksToSSPs = { - val map = new HashMap[java.lang.Integer, java.util.HashMap[TaskName, java.util.ArrayList[SSPWrapper]]] - containerToTaskMapping.foreach { - case (containerId, taskNameToSSPs) => - map.put(Integer.valueOf(containerId), convertSystemStreamPartitionSet(taskNameToSSPs.getJavaFriendlyType)) - } - map - } - - private def buildJsonMap = { - val map = new HashMap[String, Object]() - map.put(CONFIG, config) - map.put(CONTAINERS, javaSafeContainerToTaskMapping) - map.put(TASK_CHANGELOG_MAPPING, javaSafeTaskToChangelogMappings) - map - } +/** + * A servlet that dumps the job model for a Samza job. + */ +class JobServlet(jobModel: JobModel) extends ServletBase with Logging { + protected def getObjectToWrite() = jobModel } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala index c9bad90..2732cca 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala @@ -19,25 +19,29 @@ package org.apache.samza.coordinator.server; -import java.io.IOException; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; +import java.io.IOException +import javax.servlet.ServletException +import javax.servlet.http.HttpServlet +import javax.servlet.http.HttpServletRequest +import javax.servlet.http.HttpServletResponse import org.codehaus.jackson.map.ObjectMapper; +import org.apache.samza.serializers.model.SamzaObjectMapper -object ServletBase { - val JSON_MAPPER = new ObjectMapper() -} - +/** + * A simple servlet helper that makes it easy to dump objects to JSON. + */ trait ServletBase extends HttpServlet { - import ServletBase._ + val mapper = SamzaObjectMapper.getObjectMapper() override protected def doGet(request: HttpServletRequest, response: HttpServletResponse) { response.setContentType("application/json") response.setStatus(HttpServletResponse.SC_OK) - JSON_MAPPER.writeValue(response.getWriter(), getObjectToWrite()) + mapper.writeValue(response.getWriter(), getObjectToWrite()) } + /** + * Returns an object that should be fed to Jackson's ObjectMapper, and + * returned as an HTTP response. + */ protected def getObjectToWrite(): Object } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala index bd38955..7992885 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala @@ -32,18 +32,15 @@ import java.io.InputStreamReader import java.io.InputStream import java.io.OutputStream import org.apache.samza.SamzaException -import org.apache.samza.coordinator.server.HttpServer import org.apache.samza.job.CommandBuilder import scala.collection.JavaConversions._ -class ProcessJob(commandBuilder: CommandBuilder, server: HttpServer = new HttpServer) extends StreamJob with Logging { +class ProcessJob(commandBuilder: CommandBuilder) extends StreamJob with Logging { var jobStatus: Option[ApplicationStatus] = None var process: Process = null def submit: StreamJob = { jobStatus = Some(New) - server.start - commandBuilder.setUrl(server.getUrl) val waitForThreadStart = new CountDownLatch(1) val processBuilder = new ProcessBuilder(commandBuilder.buildCommand.split(" ").toList) @@ -66,7 +63,6 @@ class ProcessJob(commandBuilder: CommandBuilder, server: HttpServer = new HttpSe errThread.start waitForThreadStart.countDown process.waitFor - shutdown } } @@ -79,7 +75,6 @@ class ProcessJob(commandBuilder: CommandBuilder, server: HttpServer = new HttpSe def kill: StreamJob = { process.destroy jobStatus = Some(UnsuccessfulFinish); - shutdown ProcessJob.this } @@ -90,7 +85,7 @@ class ProcessJob(commandBuilder: CommandBuilder, server: HttpServer = new HttpSe try { process.waitFor } catch { - case e: InterruptedException => shutdown + case e: InterruptedException => info("Got interrupt.", e) } } } @@ -112,10 +107,6 @@ class ProcessJob(commandBuilder: CommandBuilder, server: HttpServer = new HttpSe } def getStatus = jobStatus.getOrElse(null) - - private def shutdown { - server.stop - } } /** http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala index b1e5237..6985af6 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala @@ -19,7 +19,7 @@ package org.apache.samza.job.local -import org.apache.samza.container.{ TaskNamesToSystemStreamPartitions, SamzaContainer } +import org.apache.samza.container.SamzaContainer import org.apache.samza.util.Logging import org.apache.samza.SamzaException import org.apache.samza.config.Config @@ -29,48 +29,40 @@ import org.apache.samza.util.Util import scala.collection.JavaConversions._ import org.apache.samza.coordinator.server.HttpServer import org.apache.samza.coordinator.server.JobServlet +import org.apache.samza.coordinator.JobCoordinator /** - * Creates a stand alone ProcessJob with the specified config + * Creates a stand alone ProcessJob with the specified config. */ class ProcessJobFactory extends StreamJobFactory with Logging { def getJob(config: Config): StreamJob = { - // Since we're local, there will only be a single task into which all the SSPs will be processed - val taskToTaskNames: Map[Int, TaskNamesToSystemStreamPartitions] = Util.assignContainerToSSPTaskNames(config, 1) - if (taskToTaskNames.size != 1) { - throw new SamzaException("Should only have a single task count but somehow got more " + taskToTaskNames.size) - } - - // So pull out that single TaskNamesToSystemStreamPartitions - val sspTaskName: TaskNamesToSystemStreamPartitions = taskToTaskNames.getOrElse(0, throw new SamzaException("Should have a 0 task number for the SSPs but somehow do not: " + taskToTaskNames)) - if (sspTaskName.size <= 0) { - throw new SamzaException("No SystemStreamPartitions to process were detected for your input streams. It's likely that the system(s) specified don't know about the input streams: %s" format config.getInputStreams) - } - - val taskNameToChangeLogPartitionMapping = Util.getTaskNameToChangeLogPartitionMapping(config, taskToTaskNames) - info("got taskName for job %s" format sspTaskName) - - val server = new HttpServer() - server.addServlet("/*", new JobServlet(config, taskToTaskNames, taskNameToChangeLogPartitionMapping)) + val coordinator = JobCoordinator(config, 1) + val containerModel = coordinator.jobModel.getContainers.get(0) - val commandBuilder: CommandBuilder = { - config.getCommandClass match { - case Some(cmdBuilderClassName) => { - // A command class was specified, so we need to use a process job to - // execute the command in its own process. - Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder] - } - case _ => { - info("Defaulting to ShellCommandBuilder") - new ShellCommandBuilder + try { + val commandBuilder = { + config.getCommandClass match { + case Some(cmdBuilderClassName) => { + // A command class was specified, so we need to use a process job to + // execute the command in its own process. + Util.getObj[CommandBuilder](cmdBuilderClassName) + } + case _ => { + info("Defaulting to ShellCommandBuilder") + new ShellCommandBuilder + } } } - } - commandBuilder - .setConfig(config) - .setId(0) + commandBuilder + .setConfig(config) + .setId(0) - new ProcessJob(commandBuilder, server) + coordinator.start + + new ProcessJob(commandBuilder) + } finally { + coordinator.stop + } } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index 4d5f0d5..7504e6d 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -24,31 +24,20 @@ import org.apache.samza.SamzaException import org.apache.samza.config.Config import org.apache.samza.config.ShellCommandConfig._ import org.apache.samza.config.TaskConfig._ -import org.apache.samza.container.{TaskNamesToSystemStreamPartitions, SamzaContainer} -import org.apache.samza.job.{StreamJob, StreamJobFactory} +import org.apache.samza.container.SamzaContainer +import org.apache.samza.job.{ StreamJob, StreamJobFactory } import org.apache.samza.util.Util import org.apache.samza.config.JobConfig._ +import org.apache.samza.coordinator.JobCoordinator /** * Creates a new Thread job with the given config */ class ThreadJobFactory extends StreamJobFactory with Logging { def getJob(config: Config): StreamJob = { - // Since we're local, there will only be a single task into which all the SSPs will be processed - val taskToTaskNames: Map[Int, TaskNamesToSystemStreamPartitions] = Util.assignContainerToSSPTaskNames(config, 1) - if(taskToTaskNames.size != 1) { - throw new SamzaException("Should only have a single task count but somehow got more " + taskToTaskNames.size) - } - - // So pull out that single TaskNamesToSystemStreamPartitions - val sspTaskName: TaskNamesToSystemStreamPartitions = taskToTaskNames.getOrElse(0, throw new SamzaException("Should have a 0 task number for the SSPs but somehow do not: " + taskToTaskNames)) - if (sspTaskName.size <= 0) { - throw new SamzaException("No SystemStreamPartitions to process were detected for your input streams. It's likely that the system(s) specified don't know about the input streams: %s" format config.getInputStreams) - } - - val taskNameToChangeLogPartitionMapping = Util.getTaskNameToChangeLogPartitionMapping(config, taskToTaskNames) - info("got taskName for job %s" format sspTaskName) info("Creating a ThreadJob, which is only meant for debugging.") + val coordinator = JobCoordinator(config, 1) + val containerModel = coordinator.jobModel.getContainers.get(0) // Give developers a nice friendly warning if they've specified task.opts and are using a threaded job. config.getTaskOpts match { @@ -56,8 +45,11 @@ class ThreadJobFactory extends StreamJobFactory with Logging { case _ => None } - // No command class was specified, so execute the job in this process - // using a threaded job. - new ThreadJob(SamzaContainer(0, sspTaskName, taskNameToChangeLogPartitionMapping, config)) + try { + coordinator.start + new ThreadJob(SamzaContainer(containerModel, config)) + } finally { + coordinator.stop + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/util/JsonHelpers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/JsonHelpers.scala b/samza-core/src/main/scala/org/apache/samza/util/JsonHelpers.scala deleted file mode 100644 index e3f23b6..0000000 --- a/samza-core/src/main/scala/org/apache/samza/util/JsonHelpers.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.util - -import org.apache.samza.container.TaskName -import org.codehaus.jackson.map.ObjectMapper -import org.apache.samza.system.SystemStreamPartition -import org.codehaus.jackson.`type`.TypeReference -import java.util -import scala.collection.JavaConversions._ -import org.apache.samza.Partition -import scala.reflect.BeanProperty -import org.apache.samza.config.MapConfig -import org.apache.samza.container.TaskNamesToSystemStreamPartitions - -/** - * Working with Jackson and JSON in Scala is tricky. These helper methods are - * used to convert objects back and forth in SamzaContainer, and the - * JobServlet. - */ -object JsonHelpers { - // Jackson really hates Scala's classes, so we need to wrap up the SSP in a - // form Jackson will take. - class SSPWrapper(@BeanProperty var partition: java.lang.Integer = null, - @BeanProperty var Stream: java.lang.String = null, - @BeanProperty var System: java.lang.String = null) { - def this() { this(null, null, null) } - def this(ssp: SystemStreamPartition) { this(ssp.getPartition.getPartitionId, ssp.getSystemStream.getStream, ssp.getSystemStream.getSystem) } - } - - def convertSystemStreamPartitionSet(sspTaskNames: java.util.Map[TaskName, java.util.Set[SystemStreamPartition]]): util.HashMap[TaskName, util.ArrayList[SSPWrapper]] = { - val map = new util.HashMap[TaskName, util.ArrayList[SSPWrapper]]() - for ((key, ssps) <- sspTaskNames) { - val al = new util.ArrayList[SSPWrapper](ssps.size) - for (ssp <- ssps) { al.add(new SSPWrapper(ssp)) } - map.put(key, al) - } - map - } - - def convertTaskNameToChangeLogPartitionMapping(mapping: Map[TaskName, Int]): util.HashMap[TaskName, java.lang.Integer] = { - val javaMap = new util.HashMap[TaskName, java.lang.Integer]() - mapping.foreach(kv => javaMap.put(kv._1, Integer.valueOf(kv._2))) - javaMap - } - - def deserializeCoordinatorBody(body: String) = new ObjectMapper().readValue(body, new TypeReference[util.HashMap[String, Object]] {}).asInstanceOf[util.HashMap[String, Object]] - - def convertCoordinatorConfig(config: util.Map[String, String]) = new MapConfig(config) - - def convertCoordinatorTaskNameChangelogPartitions(taskNameToChangelogMapping: util.Map[String, java.lang.Integer]) = { - taskNameToChangelogMapping.map { - case (taskName, changelogPartitionId) => - (new TaskName(taskName), changelogPartitionId.toInt) - }.toMap - } - - // First key is containerId, second key is TaskName, third key is - // [system|stream|partition]. - def convertCoordinatorSSPTaskNames(containers: util.Map[String, util.Map[String, util.List[util.Map[String, Object]]]]): Map[Int, TaskNamesToSystemStreamPartitions] = { - containers.map { - case (containerId, tasks) => { - containerId.toInt -> new TaskNamesToSystemStreamPartitions(tasks.map { - case (taskName, ssps) => { - new TaskName(taskName) -> ssps.map { - case (sspMap) => new SystemStreamPartition( - sspMap.get("system").toString, - sspMap.get("stream").toString, - new Partition(sspMap.get("partition").toString.toInt)) - }.toSet - } - }.toMap) - } - }.toMap - } -} \ No newline at end of file
