SAMZA-798 : Performance and stability issue after combining checkpoint and coordinator stream
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/eba9b28f Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/eba9b28f Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/eba9b28f Branch: refs/heads/samza-sql Commit: eba9b28f34874b1d9a7e467d8a6046dc5357b4d5 Parents: 8677a27 Author: Navina <[email protected]> Authored: Mon Nov 2 13:59:42 2015 -0800 Committer: Navina <[email protected]> Committed: Mon Nov 2 13:59:42 2015 -0800 ---------------------------------------------------------------------- .../versioned/jobs/configuration-table.html | 23 + .../org/apache/samza/checkpoint/Checkpoint.java | 72 ++++ .../samza/checkpoint/CheckpointManager.java | 53 +++ .../checkpoint/CheckpointManagerFactory.java | 30 ++ .../org/apache/samza/checkpoint/Checkpoint.java | 72 ---- .../samza/checkpoint/CheckpointManager.java | 93 ---- .../stream/messages/SetCheckpoint.java | 74 ---- .../org/apache/samza/job/model/TaskModel.java | 19 +- .../serializers/model/JsonTaskModelMixIn.java | 9 +- .../samza/checkpoint/CheckpointTool.scala | 13 +- .../apache/samza/checkpoint/OffsetManager.scala | 77 ++-- .../file/FileSystemCheckpointManager.scala | 87 ++++ .../apache/samza/container/SamzaContainer.scala | 18 +- .../samza/coordinator/JobCoordinator.scala | 38 +- .../samza/migration/JobRunnerMigration.scala | 18 +- .../MockCoordinatorStreamWrappedConsumer.java | 16 +- .../model/TestSamzaObjectMapper.java | 9 +- .../samza/checkpoint/TestCheckpointTool.scala | 6 +- .../samza/checkpoint/TestOffsetManager.scala | 16 +- .../file/TestFileSystemCheckpointManager.scala | 86 ++++ .../samza/container/TestSamzaContainer.scala | 26 +- .../task/TestGroupByContainerCount.scala | 2 +- .../samza/coordinator/TestJobCoordinator.scala | 104 +---- .../org/apache/samza/job/TestJobRunner.scala | 4 +- .../apache/samza/job/local/TestProcessJob.scala | 2 +- .../old/checkpoint/KafkaCheckpointLogKey.scala | 188 -------- .../old/checkpoint/KafkaCheckpointManager.scala | 337 --------------- .../KafkaCheckpointManagerFactory.scala | 108 ----- .../checkpoint/KafkaCheckpointMigration.scala | 94 ---- .../kafka/KafkaCheckpointLogKey.scala | 194 +++++++++ .../kafka/KafkaCheckpointManager.scala | 320 ++++++++++++++ .../kafka/KafkaCheckpointManagerFactory.scala | 101 +++++ .../org/apache/samza/config/KafkaConfig.scala | 10 + .../migration/KafkaCheckpointMigration.scala | 147 +++++++ .../scala/org/apache/samza/util/KafkaUtil.scala | 114 ++++- .../apache/samza/util/KafkaUtilException.scala | 31 ++ .../checkpoint/TestKafkaCheckpointManager.scala | 430 ------------------- .../kafka/TeskKafkaCheckpointLogKey.scala | 71 +++ .../kafka/TestKafkaCheckpointManager.scala | 244 +++++++++++ .../TestKafkaCheckpointMigration.scala | 243 +++++++++++ .../samza/job/yarn/TestContainerAllocator.java | 2 +- .../yarn/TestHostAwareContainerAllocator.java | 2 +- .../samza/job/yarn/TestSamzaTaskManager.java | 2 +- .../job/yarn/TestSamzaAppMasterLifecycle.scala | 2 +- 44 files changed, 1974 insertions(+), 1633 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 4adac09..b5d3813 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -1471,6 +1471,29 @@ <td class="default">268435456</td> <td class="description">The number of bytes of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 256MB if not set.</td> </tr> + + <tr> + <th colspan="3" class="section" id="task-migration"> + Migrating from Samza 0.9.1 to 0.10.0<br> + <span class="subtitle"> + (This section applies if you are upgrading from Samza 0.9.1 to 0.10.0 and have set + <a href="#task-checkpoint-factory" class="property">task.checkpoint.factory</a> to anything <b> other than </b> + <code>org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory</code>) + </span> + </th> + </tr> + + <tr> + <td class="property" id="task-checkpoint-skip-migration">task.checkpoint.skip-migration</td> + <td class="default">false</td> + <td class="description"> + When migrating from 0.9.1 to 0.10.0, the taskName-to-changelog partition mapping was moved from the checkpoint stream to the coordinator stream. <br /> + If you are using a checkpoint manager other than kafka + (<code>org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory</code>), you have to + manually migrate taskName-to-changelog partition mapping to the coordinator stream. <br /> + This can be achieved with the assistance of the <code>checkpoint-tool.sh</code>. + </td> + </tr> </tbody> </table> </body> http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java b/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java new file mode 100644 index 0000000..593d118 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.checkpoint; + +import org.apache.samza.system.SystemStreamPartition; + +import java.util.Collections; +import java.util.Map; + +/** + * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each. + * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part + * of restarting a failed container within a running job. + */ +public class Checkpoint { + private final Map<SystemStreamPartition, String> offsets; + + /** + * Constructs a new checkpoint based off a map of Samza stream offsets. + * @param offsets Map of Samza streams to their current offset. + */ + public Checkpoint(Map<SystemStreamPartition, String> offsets) { + this.offsets = offsets; + } + + /** + * Gets a unmodifiable view of the current Samza stream offsets. + * @return A unmodifiable view of a Map of Samza streams to their recorded offsets. + */ + public Map<SystemStreamPartition, String> getOffsets() { + return Collections.unmodifiableMap(offsets); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Checkpoint)) return false; + + Checkpoint that = (Checkpoint) o; + + if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false; + + return true; + } + + @Override + public int hashCode() { + return offsets != null ? offsets.hashCode() : 0; + } + + @Override + public String toString() { + return "Checkpoint [offsets=" + offsets + "]"; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java new file mode 100644 index 0000000..dc14beb --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.checkpoint; + +import org.apache.samza.container.TaskName; + +/** + * CheckpointManagers read and write {@link org.apache.samza.checkpoint.Checkpoint} to some + * implementation-specific location. + */ +public interface CheckpointManager { + void start(); + + /** + * Registers this manager to write checkpoints of a specific Samza stream partition. + * @param taskName Specific Samza taskName of which to write checkpoints for. + */ + void register(TaskName taskName); + + /** + * Writes a checkpoint based on the current state of a Samza stream partition. + * @param taskName Specific Samza taskName of which to write a checkpoint of. + * @param checkpoint Reference to a Checkpoint object to store offset data in. + */ + void writeCheckpoint(TaskName taskName, Checkpoint checkpoint); + + /** + * Returns the last recorded checkpoint for a specified taskName. + * @param taskName Specific Samza taskName for which to get the last checkpoint of. + * @return A Checkpoint object with the recorded offset data of the specified partition. + */ + Checkpoint readLastCheckpoint(TaskName taskName); + + void stop(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java new file mode 100644 index 0000000..fe480b5 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.checkpoint; + +import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistry; + +/** + * Build a {@link org.apache.samza.checkpoint.CheckpointManager}. + */ +public interface CheckpointManagerFactory { + public CheckpointManager getCheckpointManager(Config config, MetricsRegistry registry); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java b/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java deleted file mode 100644 index 593d118..0000000 --- a/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.checkpoint; - -import org.apache.samza.system.SystemStreamPartition; - -import java.util.Collections; -import java.util.Map; - -/** - * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each. - * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part - * of restarting a failed container within a running job. - */ -public class Checkpoint { - private final Map<SystemStreamPartition, String> offsets; - - /** - * Constructs a new checkpoint based off a map of Samza stream offsets. - * @param offsets Map of Samza streams to their current offset. - */ - public Checkpoint(Map<SystemStreamPartition, String> offsets) { - this.offsets = offsets; - } - - /** - * Gets a unmodifiable view of the current Samza stream offsets. - * @return A unmodifiable view of a Map of Samza streams to their recorded offsets. - */ - public Map<SystemStreamPartition, String> getOffsets() { - return Collections.unmodifiableMap(offsets); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof Checkpoint)) return false; - - Checkpoint that = (Checkpoint) o; - - if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false; - - return true; - } - - @Override - public int hashCode() { - return offsets != null ? offsets.hashCode() : 0; - } - - @Override - public String toString() { - return "Checkpoint [offsets=" + offsets + "]"; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java deleted file mode 100644 index 0185751..0000000 --- a/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.checkpoint; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import org.apache.samza.container.TaskName; -import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage; -import org.apache.samza.coordinator.stream.messages.SetCheckpoint; -import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer; -import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer; -import org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * The CheckpointManager is used to persist and restore checkpoint information. The CheckpointManager uses - * CoordinatorStream underneath to do this. - */ -public class CheckpointManager extends AbstractCoordinatorStreamManager { - - private static final Logger log = LoggerFactory.getLogger(CheckpointManager.class); - private final Map<TaskName, Checkpoint> taskNamesToOffsets; - private final HashSet<TaskName> taskNames; - - public CheckpointManager(CoordinatorStreamSystemProducer coordinatorStreamProducer, - CoordinatorStreamSystemConsumer coordinatorStreamConsumer, - String source) { - super(coordinatorStreamProducer, coordinatorStreamConsumer, source); - taskNamesToOffsets = new HashMap<TaskName, Checkpoint>(); - taskNames = new HashSet<TaskName>(); - } - - /** - * Registers this manager to write checkpoints of a specific Samza stream partition. - * @param taskName Specific Samza taskName of which to write checkpoints for. - */ - public void register(TaskName taskName) { - log.debug("Adding taskName {} to {}", taskName, this); - taskNames.add(taskName); - registerCoordinatorStreamConsumer(); - registerCoordinatorStreamProducer(taskName.getTaskName()); - } - - /** - * Writes a checkpoint based on the current state of a Samza stream partition. - * @param taskName Specific Samza taskName of which to write a checkpoint of. - * @param checkpoint Reference to a Checkpoint object to store offset data in. - */ - public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) { - log.debug("Writing checkpoint for Task: {} with offsets: {}", taskName.getTaskName(), checkpoint.getOffsets()); - send(new SetCheckpoint(getSource(), taskName.getTaskName(), checkpoint)); - } - - /** - * Returns the last recorded checkpoint for a specified taskName. - * @param taskName Specific Samza taskName for which to get the last checkpoint of. - * @return A Checkpoint object with the recorded offset data of the specified partition. - */ - public Checkpoint readLastCheckpoint(TaskName taskName) { - // Bootstrap each time to make sure that we are caught up with the stream, the bootstrap will just catch up on consecutive calls - log.debug("Reading checkpoint for Task: {}", taskName.getTaskName()); - for (CoordinatorStreamMessage coordinatorStreamMessage : getBootstrappedStream(SetCheckpoint.TYPE)) { - SetCheckpoint setCheckpoint = new SetCheckpoint(coordinatorStreamMessage); - TaskName taskNameInCheckpoint = new TaskName(setCheckpoint.getKey()); - if (taskNames.contains(taskNameInCheckpoint)) { - taskNamesToOffsets.put(taskNameInCheckpoint, setCheckpoint.getCheckpoint()); - log.debug("Adding checkpoint {} for taskName {}", taskNameInCheckpoint, taskName); - } - } - return taskNamesToOffsets.get(taskName); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java deleted file mode 100644 index 21afa85..0000000 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.coordinator.stream.messages; - -import org.apache.samza.checkpoint.Checkpoint; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.util.Util; - -import java.util.HashMap; -import java.util.Map; - -/** - * The SetCheckpoint is used to store the checkpoint messages for a particular task. - * The structure looks like: - * { - * Key: TaskName - * Type: set-checkpoint - * Source: ContainerID - * MessageMap: - * { - * SSP1 : offset, - * SSP2 : offset - * } - * } - */ -public class SetCheckpoint extends CoordinatorStreamMessage { - public static final String TYPE = "set-checkpoint"; - - public SetCheckpoint(CoordinatorStreamMessage message) { - super(message.getKeyArray(), message.getMessageMap()); - } - - /** - * The SetCheckpoint is used to store checkpoint message for a given task. - * - * @param source The source writing the checkpoint - * @param key The key for the checkpoint message (Typically task name) - * @param checkpoint Checkpoint message to be written to the stream - */ - public SetCheckpoint(String source, String key, Checkpoint checkpoint) { - super(source); - setType(TYPE); - setKey(key); - Map<SystemStreamPartition, String> offsets = checkpoint.getOffsets(); - for (Map.Entry<SystemStreamPartition, String> systemStreamPartitionStringEntry : offsets.entrySet()) { - putMessageValue(Util.sspToString(systemStreamPartitionStringEntry.getKey()), systemStreamPartitionStringEntry.getValue()); - } - } - - public Checkpoint getCheckpoint() { - Map<SystemStreamPartition, String> offsetMap = new HashMap<SystemStreamPartition, String>(); - for (Map.Entry<String, String> sspToOffsetEntry : getMessageValues().entrySet()) { - offsetMap.put(Util.stringToSsp(sspToOffsetEntry.getKey()), sspToOffsetEntry.getValue()); - } - return new Checkpoint(offsetMap); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java index e00c49d..59bf2e0 100644 --- a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java +++ b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java @@ -20,7 +20,6 @@ package org.apache.samza.job.model; import java.util.Collections; -import java.util.Map; import java.util.Set; import org.apache.samza.Partition; import org.apache.samza.container.TaskName; @@ -41,12 +40,12 @@ import org.apache.samza.system.SystemStreamPartition; */ public class TaskModel implements Comparable<TaskModel> { private final TaskName taskName; - private final Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets; + private final Set<SystemStreamPartition> systemStreamPartitions; private final Partition changelogPartition; - public TaskModel(TaskName taskName, Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets, Partition changelogPartition) { + public TaskModel(TaskName taskName, Set<SystemStreamPartition> systemStreamPartitions, Partition changelogPartition) { this.taskName = taskName; - this.systemStreamPartitionsToOffsets = Collections.unmodifiableMap(systemStreamPartitionsToOffsets); + this.systemStreamPartitions = Collections.unmodifiableSet(systemStreamPartitions); this.changelogPartition = changelogPartition; } @@ -55,17 +54,13 @@ public class TaskModel implements Comparable<TaskModel> { } public Set<SystemStreamPartition> getSystemStreamPartitions() { - return systemStreamPartitionsToOffsets.keySet(); + return systemStreamPartitions; } public Partition getChangelogPartition() { return changelogPartition; } - public Map<SystemStreamPartition, String> getCheckpointedOffsets() { - return systemStreamPartitionsToOffsets; - } - @Override public boolean equals(Object o) { if (this == o) { @@ -80,7 +75,7 @@ public class TaskModel implements Comparable<TaskModel> { if (!changelogPartition.equals(taskModel.changelogPartition)) { return false; } - if (!systemStreamPartitionsToOffsets.equals(taskModel.systemStreamPartitionsToOffsets)) { + if (!systemStreamPartitions.equals(taskModel.systemStreamPartitions)) { return false; } if (!taskName.equals(taskModel.taskName)) { @@ -93,7 +88,7 @@ public class TaskModel implements Comparable<TaskModel> { @Override public int hashCode() { int result = taskName.hashCode(); - result = 31 * result + systemStreamPartitionsToOffsets.hashCode(); + result = 31 * result + systemStreamPartitions.hashCode(); result = 31 * result + changelogPartition.hashCode(); return result; } @@ -101,7 +96,7 @@ public class TaskModel implements Comparable<TaskModel> { @Override public String toString() { - return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitionsToOffsets.keySet() + ", changeLogPartition=" + changelogPartition + "]"; + return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitions + ", changeLogPartition=" + changelogPartition + "]"; } public int compareTo(TaskModel other) { http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java index 172358a..3ebe391 100644 --- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java +++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java @@ -19,7 +19,8 @@ package org.apache.samza.serializers.model; -import java.util.Map; +import java.util.Set; + import org.apache.samza.Partition; import org.apache.samza.container.TaskName; import org.apache.samza.system.SystemStreamPartition; @@ -31,14 +32,14 @@ import org.codehaus.jackson.annotate.JsonProperty; */ public abstract class JsonTaskModelMixIn { @JsonCreator - public JsonTaskModelMixIn(@JsonProperty("task-name") TaskName taskName, @JsonProperty("system-stream-partitions-offsets") Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets, @JsonProperty("changelog-partition") Partition changelogPartition) { + public JsonTaskModelMixIn(@JsonProperty("task-name") TaskName taskName, @JsonProperty("system-stream-partitions") Set<SystemStreamPartition> systemStreamPartitions, @JsonProperty("changelog-partition") Partition changelogPartition) { } @JsonProperty("task-name") abstract TaskName getTaskName(); - @JsonProperty("system-stream-partitions-offsets") - abstract Map<SystemStreamPartition, String> getCheckpointedOffsets(); + @JsonProperty("system-stream-partitions") + abstract Set<SystemStreamPartition> getSystemStreamPartitions(); @JsonProperty("changelog-partition") abstract Partition getChangelogPartition(); http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala index 2e3aeb8..31b208f 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala @@ -29,10 +29,9 @@ import org.apache.samza.container.TaskName import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.util.CommandLine +import org.apache.samza.util.{Util, CommandLine, Logging} import org.apache.samza.{Partition, SamzaException} import scala.collection.JavaConversions._ -import org.apache.samza.util.Logging import org.apache.samza.coordinator.JobCoordinator import scala.collection.immutable.HashMap @@ -118,10 +117,12 @@ object CheckpointTool { } def apply(config: Config, offsets: TaskNameToCheckpointMap) = { - val factory = new CoordinatorStreamSystemFactory - val coordinatorStreamConsumer = factory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap()) - val coordinatorStreamProducer = factory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap()) - val manager = new CheckpointManager(coordinatorStreamProducer, coordinatorStreamConsumer, "checkpoint-tool") + val manager = config.getCheckpointManagerFactory match { + case Some(className) => + Util.getObj[CheckpointManagerFactory](className).getCheckpointManager(config, new MetricsRegistryMap) + case _ => + throw new SamzaException("This job does not use checkpointing (task.checkpoint.factory is not set).") + } new CheckpointTool(config, offsets, manager) } http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala index 1464acc..00648e4 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala @@ -72,8 +72,7 @@ object OffsetManager extends Logging { config: Config, checkpointManager: CheckpointManager = null, systemAdmins: Map[String, SystemAdmin] = Map(), - offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics, - latestOffsets: Map[TaskName, Map[SystemStreamPartition, String]] = Map()) = { + offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics) = { debug("Building offset manager for %s." format systemStreamMetadata) val offsetSettings = systemStreamMetadata @@ -99,7 +98,7 @@ object OffsetManager extends Logging { // Build OffsetSetting so we can create a map for OffsetManager. (systemStream, OffsetSetting(systemStreamMetadata, defaultOffsetType, resetOffset)) }.toMap - new OffsetManager(offsetSettings, checkpointManager, systemAdmins, offsetManagerMetrics, latestOffsets) + new OffsetManager(offsetSettings, checkpointManager, systemAdmins, offsetManagerMetrics) } } @@ -142,22 +141,12 @@ class OffsetManager( /** * offsetManagerMetrics for keeping track of checkpointed offsets of each SystemStreamPartition. */ - val offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics, - - /* - * The previously read checkpoints restored from the coordinator stream - */ - val previousCheckpointedOffsets: Map[TaskName, Map[SystemStreamPartition, String]] = Map()) extends Logging { + val offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics) extends Logging { /** * Last offsets processed for each SystemStreamPartition. */ - // Filter out null offset values, we can't use them, these exist only because of SSP information - var lastProcessedOffsets = previousCheckpointedOffsets.map { - case (taskName, sspToOffset) => { - taskName -> sspToOffset.filter(_._2 != null) - } - } + var lastProcessedOffsets = Map[TaskName, Map[SystemStreamPartition, String]]() /** * Offsets to start reading from for each SystemStreamPartition. This @@ -175,13 +164,12 @@ class OffsetManager( def register(taskName: TaskName, systemStreamPartitionsToRegister: Set[SystemStreamPartition]) { systemStreamPartitions.getOrElseUpdate(taskName, mutable.Set[SystemStreamPartition]()).addAll(systemStreamPartitionsToRegister) // register metrics - systemStreamPartitions.foreach { case (taskName, ssp) => ssp.foreach(ssp => offsetManagerMetrics.addCheckpointedOffset(ssp, "")) } + systemStreamPartitions.foreach { case (taskName, ssp) => ssp.foreach (ssp => offsetManagerMetrics.addCheckpointedOffset(ssp, "")) } } def start { registerCheckpointManager - initializeCheckpointManager - loadOffsets + loadOffsetsFromCheckpointManager stripResetStreams loadStartingOffsets loadDefaults @@ -269,32 +257,51 @@ class OffsetManager( } } - private def initializeCheckpointManager { + /** + * Loads last processed offsets from checkpoint manager for all registered + * partitions. + */ + private def loadOffsetsFromCheckpointManager { if (checkpointManager != null) { + debug("Loading offsets from checkpoint manager.") + checkpointManager.start + val result = systemStreamPartitions + .keys + .flatMap(restoreOffsetsFromCheckpoint(_)) + .toMap + lastProcessedOffsets ++= result.map { + case (taskName, sspToOffset) => { + taskName -> sspToOffset.filter { + case (systemStreamPartition, offset) => + val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream) + if (!shouldKeep) { + info("Ignoring previously checkpointed offset %s for %s since the offset is for a stream that is not currently an input stream." format (offset, systemStreamPartition)) + } + info("Checkpointed offset is currently %s for %s" format (offset, systemStreamPartition)) + shouldKeep + } + } + } } else { debug("Skipping offset load from checkpoint manager because no manager was defined.") } } /** - * Loads last processed offsets from checkpoint manager for all registered - * partitions. + * Loads last processed offsets for a single taskName. */ - private def loadOffsets { - debug("Loading offsets") - lastProcessedOffsets.map { - case (taskName, sspToOffsets) => { - taskName -> sspToOffsets.filter { - case (systemStreamPartition, offset) => - val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream) - if (!shouldKeep) { - info("Ignoring previously checkpointed offset %s for %s since the offset is for a stream that is not currently an input stream." format (offset, systemStreamPartition)) - } - info("Checkpointed offset is currently %s for %s." format (offset, systemStreamPartition)) - shouldKeep - } - } + private def restoreOffsetsFromCheckpoint(taskName: TaskName): Map[TaskName, Map[SystemStreamPartition, String]] = { + debug("Loading checkpoints for taskName: %s." format taskName) + + val checkpoint = checkpointManager.readLastCheckpoint(taskName) + + if (checkpoint != null) { + Map(taskName -> checkpoint.getOffsets.toMap) + } else { + info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName) + + Map(taskName -> Map()) } } http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala new file mode 100644 index 0000000..edd0ace --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.checkpoint.file + +import java.io.File +import java.io.FileNotFoundException +import java.io.FileOutputStream +import java.util +import org.apache.samza.SamzaException +import org.apache.samza.checkpoint.Checkpoint +import org.apache.samza.checkpoint.CheckpointManager +import org.apache.samza.checkpoint.CheckpointManagerFactory +import org.apache.samza.config.Config +import org.apache.samza.config.FileSystemCheckpointManagerConfig.Config2FSCP +import org.apache.samza.config.JobConfig.Config2Job +import org.apache.samza.container.TaskName +import org.apache.samza.metrics.MetricsRegistry +import org.apache.samza.serializers.CheckpointSerde +import scala.io.Source + +class FileSystemCheckpointManager( + jobName: String, + root: File, + serde: CheckpointSerde = new CheckpointSerde) extends CheckpointManager { + + override def register(taskName: TaskName):Unit = Unit + + def getCheckpointFile(taskName: TaskName) = getFile(jobName, taskName, "checkpoints") + + def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { + val bytes = serde.toBytes(checkpoint) + val fos = new FileOutputStream(getCheckpointFile(taskName)) + + fos.write(bytes) + fos.close + } + + def readLastCheckpoint(taskName: TaskName): Checkpoint = { + try { + val bytes = Source.fromFile(getCheckpointFile(taskName)).map(_.toByte).toArray + + serde.fromBytes(bytes) + } catch { + case e: FileNotFoundException => null + } + } + + def start { + if (!root.exists) { + throw new SamzaException("Root directory for file system checkpoint manager does not exist: %s" format root) + } + } + + def stop {} + + private def getFile(jobName: String, taskName: TaskName, fileType:String) = + new File(root, "%s-%s-%s" format (jobName, taskName, fileType)) +} + +class FileSystemCheckpointManagerFactory extends CheckpointManagerFactory { + def getCheckpointManager(config: Config, registry: MetricsRegistry) = { + val name = config + .getName + .getOrElse(throw new SamzaException("Missing job name in configs")) + val root = config + .getFileSystemCheckpointRoot + .getOrElse(throw new SamzaException("Missing checkpoint root in configs")) + new FileSystemCheckpointManager(name, new File(root)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 0b73403..3787b85 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -21,8 +21,7 @@ package org.apache.samza.container import java.io.File import org.apache.samza.SamzaException -import org.apache.samza.checkpoint.{ CheckpointManager, OffsetManager } -import org.apache.samza.config.Config +import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics} import org.apache.samza.config.MetricsConfig.Config2Metrics import org.apache.samza.config.SerializerConfig.Config2Serializer import org.apache.samza.config.ShellCommandConfig @@ -61,7 +60,6 @@ import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel} import org.apache.samza.serializers.model.SamzaObjectMapper import org.apache.samza.config.JobConfig.Config2Job import java.lang.Thread.UncaughtExceptionHandler -import org.apache.samza.checkpoint.OffsetManagerMetrics object SamzaContainer extends Logging { def main(args: Array[String]) { @@ -308,15 +306,17 @@ object SamzaContainer extends Logging { val coordinatorSystemConsumer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemConsumer(config, samzaContainerMetrics.registry) val coordinatorSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, samzaContainerMetrics.registry) - val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, String.valueOf(containerId)) val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer) - + val checkpointManager = config.getCheckpointManagerFactory() match { + case Some(checkpointFactoryClassName) if (!checkpointFactoryClassName.isEmpty) => + Util + .getObj[CheckpointManagerFactory](checkpointFactoryClassName) + .getCheckpointManager(config, samzaContainerMetrics.registry) + case _ => null + } info("Got checkpoint manager: %s" format checkpointManager) - val combinedOffsets: Map[TaskName, Map[SystemStreamPartition, String]] = - containerModel.getTasks.map{case (taskName, taskModel) => taskName -> mapAsScalaMap(taskModel.getCheckpointedOffsets).toMap }.toMap - - val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics, combinedOffsets) + val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics) info("Got offset manager: %s" format offsetManager) http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala index 03299cb..ef40c35 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala @@ -74,7 +74,6 @@ object JobCoordinator extends Logging { coordinatorSystemConsumer.bootstrap val config = coordinatorSystemConsumer.getConfig info("Got config: %s" format config) - val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator") val changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator") val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer) @@ -91,7 +90,7 @@ object JobCoordinator extends Logging { val streamMetadataCache = new StreamMetadataCache(systemAdmins) - val jobCoordinator = getJobCoordinator(rewriteConfig(config), checkpointManager, changelogManager, localityManager, streamMetadataCache) + val jobCoordinator = getJobCoordinator(rewriteConfig(config), changelogManager, localityManager, streamMetadataCache) createChangeLogStreams(config, jobCoordinator.jobModel.maxChangeLogStreamPartitions, streamMetadataCache) jobCoordinator @@ -103,14 +102,13 @@ object JobCoordinator extends Logging { * Build a JobCoordinator using a Samza job's configuration. */ def getJobCoordinator(config: Config, - checkpointManager: CheckpointManager, changelogManager: ChangelogPartitionManager, localityManager: LocalityManager, streamMetadataCache: StreamMetadataCache) = { - val jobModelGenerator = initializeJobModel(config, checkpointManager, changelogManager, localityManager, streamMetadataCache) + val jobModelGenerator = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache) val server = new HttpServer server.addServlet("/*", new JobServlet(jobModelGenerator)) - currentJobCoordinator = new JobCoordinator(jobModelGenerator(), server, checkpointManager) + currentJobCoordinator = new JobCoordinator(jobModelGenerator(), server) currentJobCoordinator } @@ -170,7 +168,6 @@ object JobCoordinator extends Logging { * which catchup with the latest content from the coordinator stream. */ private def initializeJobModel(config: Config, - checkpointManager: CheckpointManager, changelogManager: ChangelogPartitionManager, localityManager: LocalityManager, streamMetadataCache: StreamMetadataCache): () => JobModel = { @@ -192,10 +189,6 @@ object JobCoordinator extends Logging { { new util.HashMap[TaskName, Integer]() } - - checkpointManager.start() - groups.foreach(taskSSP => checkpointManager.register(taskSSP._1)) - // We don't need to start() localityManager as they share the same instances with checkpoint and changelog managers. // TODO: This code will go away with refactoring - SAMZA-678 @@ -204,7 +197,6 @@ object JobCoordinator extends Logging { // Generate the jobModel def jobModelGenerator(): JobModel = refreshJobModel(config, allSystemStreamPartitions, - checkpointManager, groups, previousChangelogMapping, localityManager) @@ -238,7 +230,6 @@ object JobCoordinator extends Logging { */ private def refreshJobModel(config: Config, allSystemStreamPartitions: util.Set[SystemStreamPartition], - checkpointManager: CheckpointManager, groups: util.Map[TaskName, util.Set[SystemStreamPartition]], previousChangelogMapping: util.Map[TaskName, Integer], localityManager: LocalityManager): JobModel = { @@ -253,17 +244,6 @@ object JobCoordinator extends Logging { { groups.map { case (taskName, systemStreamPartitions) => - val checkpoint = Option(checkpointManager.readLastCheckpoint(taskName)).getOrElse(new Checkpoint(new util.HashMap[SystemStreamPartition, String]())) - // Find the system partitions which don't have a checkpoint and set null for the values for offsets - val taskOffsets = checkpoint.getOffsets - val offsetMap = new util.HashMap[SystemStreamPartition, String]() - systemStreamPartitions.foreach { - ssp => - if(taskOffsets.containsKey(ssp)) - offsetMap.put(ssp, taskOffsets.get(ssp)) - else - offsetMap.put(ssp, null) - } val changelogPartition = Option(previousChangelogMapping.get(taskName)) match { case Some(changelogPartitionId) => new Partition(changelogPartitionId) @@ -274,7 +254,7 @@ object JobCoordinator extends Logging { info("New task %s is being assigned changelog partition %s." format(taskName, maxChangelogPartitionId)) new Partition(maxChangelogPartitionId) } - new TaskModel(taskName, offsetMap, changelogPartition) + new TaskModel(taskName, systemStreamPartitions, changelogPartition) }.toSet } @@ -336,12 +316,7 @@ class JobCoordinator( /** * HTTP server used to serve a Samza job's container model to SamzaContainers when they start up. */ - val server: HttpServer = null, - - /** - * Handle to checkpoint manager that's used to refresh the JobModel - */ - val checkpointManager: CheckpointManager) extends Logging { + val server: HttpServer = null) extends Logging { debug("Got job model: %s." format jobModel) @@ -358,9 +333,6 @@ class JobCoordinator( debug("Stopping HTTP server.") server.stop info("Stopped HTTP server.") - debug("Stopping checkpoint manager.") - checkpointManager.stop() - info("Stopped checkpoint manager.") } } } http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala b/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala index 374e27e..f38b87a 100644 --- a/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala +++ b/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala @@ -24,7 +24,10 @@ import org.apache.samza.SamzaException object JobRunnerMigration { - val CHECKPOINTMIGRATION = "old.checkpoint.KafkaCheckpointMigration" + val CHECKPOINT_MIGRATION = "org.apache.samza.migration.KafkaCheckpointMigration" + val UNSUPPORTED_ERROR_MSG = "Auto checkpoint migration for 0.10.0 upgrade is only supported for Kafka checkpointing system, " + + "for everything else, please use the checkpoint tool to migrate the taskname-to-changelog mapping, and add " + + "task.checkpoint.skip-migration=true to your configs." def apply(config: Config) = { val migration = new JobRunnerMigration migration.checkpointMigration(config) @@ -38,15 +41,18 @@ class JobRunnerMigration extends Logging { checkpointFactory match { case Some("org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory") => info("Performing checkpoint migration") - val checkpointMigrationPlan = Util.getObj[MigrationPlan](JobRunnerMigration.CHECKPOINTMIGRATION) + val checkpointMigrationPlan = Util.getObj[MigrationPlan](JobRunnerMigration.CHECKPOINT_MIGRATION) checkpointMigrationPlan.migrate(config) case None => info("No task.checkpoint.factory defined, not performing any checkpoint migration") case _ => - val errorMsg = "Auto checkpoint migration for 0.10.0 upgrade is only supported for Kafka checkpointing system, " + - "for everything else, please use the checkpoint tool and remove task.checkpoint.factory configuration" - error(errorMsg) - throw new SamzaException(errorMsg) + val skipMigration = config.getBoolean("task.checkpoint.skip-migration", false) + if (skipMigration) { + info("Job is configured to skip any checkpoint migration.") + } else { + error(JobRunnerMigration.UNSUPPORTED_ERROR_MSG) + throw new SamzaException(JobRunnerMigration.UNSUPPORTED_ERROR_MSG) + } } } } http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java index dd04d28..429573b 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java @@ -20,22 +20,18 @@ package org.apache.samza.coordinator.stream; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import org.apache.samza.SamzaException; -import org.apache.samza.checkpoint.Checkpoint; import org.apache.samza.config.Config; import org.apache.samza.coordinator.stream.messages.SetChangelogMapping; -import org.apache.samza.coordinator.stream.messages.SetCheckpoint; import org.apache.samza.coordinator.stream.messages.SetConfig; import org.apache.samza.serializers.model.SamzaObjectMapper; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.util.BlockingEnvelopeMap; -import org.apache.samza.util.Util; import org.codehaus.jackson.map.ObjectMapper; /** @@ -47,7 +43,6 @@ import org.codehaus.jackson.map.ObjectMapper; public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap { private final static ObjectMapper MAPPER = SamzaObjectMapper.getObjectMapper(); public final static String CHANGELOGPREFIX = "ch:"; - public final static String CHECKPOINTPREFIX = "cp:"; public final CountDownLatch blockConsumerPoll = new CountDownLatch(1); public boolean blockpollFlag = false; @@ -78,16 +73,7 @@ public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap { for (Map.Entry<String, String> configPair : config.entrySet()) { byte[] keyBytes = null; byte[] messgeBytes = null; - if (configPair.getKey().startsWith(CHECKPOINTPREFIX)) { - String[] checkpointInfo = configPair.getKey().split(":"); - String[] sspOffsetPair = configPair.getValue().split(":"); - HashMap<SystemStreamPartition, String> checkpointMap = new HashMap<SystemStreamPartition, String>(); - checkpointMap.put(Util.stringToSsp(sspOffsetPair[0]), sspOffsetPair[1]); - Checkpoint cp = new Checkpoint(checkpointMap); - SetCheckpoint setCheckpoint = new SetCheckpoint(checkpointInfo[1], checkpointInfo[2], cp); - keyBytes = MAPPER.writeValueAsString(setCheckpoint.getKeyArray()).getBytes("UTF-8"); - messgeBytes = MAPPER.writeValueAsString(setCheckpoint.getMessageMap()).getBytes("UTF-8"); - } else if (configPair.getKey().startsWith(CHANGELOGPREFIX)) { + if (configPair.getKey().startsWith(CHANGELOGPREFIX)) { String[] changelogInfo = configPair.getKey().split(":"); String changeLogPartition = configPair.getValue(); SetChangelogMapping changelogMapping = new SetChangelogMapping(changelogInfo[1], changelogInfo[2], Integer.parseInt(changeLogPartition)); http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java index ad1fbc5..2c64598 100644 --- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java +++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java @@ -21,7 +21,10 @@ package org.apache.samza.serializers.model; import static org.junit.Assert.assertEquals; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; + import org.apache.samza.Partition; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; @@ -38,12 +41,12 @@ public class TestSamzaObjectMapper { public void testJsonTaskModel() throws Exception { ObjectMapper mapper = SamzaObjectMapper.getObjectMapper(); Map<String, String> configMap = new HashMap<String, String>(); - Map<SystemStreamPartition, String> sspOffset = new HashMap<SystemStreamPartition, String>(); + Set<SystemStreamPartition> ssp = new HashSet<>(); configMap.put("a", "b"); Config config = new MapConfig(configMap); TaskName taskName = new TaskName("test"); - sspOffset.put(new SystemStreamPartition("foo", "bar", new Partition(1)), ""); - TaskModel taskModel = new TaskModel(taskName, sspOffset, new Partition(2)); + ssp.add(new SystemStreamPartition("foo", "bar", new Partition(1))); + TaskModel taskModel = new TaskModel(taskName, ssp, new Partition(2)); Map<TaskName, TaskModel> tasks = new HashMap<TaskName, TaskModel>(); tasks.put(taskName, taskModel); ContainerModel containerModel = new ContainerModel(1, tasks); http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala index 00b8977..0865b31 100644 --- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala @@ -41,7 +41,7 @@ object TestCheckpointTool { var systemProducer: SystemProducer = null var systemAdmin: SystemAdmin = null - class MockCheckpointManagerFactory { + class MockCheckpointManagerFactory extends CheckpointManagerFactory { def getCheckpointManager(config: Config, registry: MetricsRegistry) = checkpointManager } @@ -87,7 +87,7 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar { @Test def testReadLatestCheckpoint { - val checkpointTool = new CheckpointTool(config, null, TestCheckpointTool.checkpointManager) + val checkpointTool = CheckpointTool(config, null) checkpointTool.run verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn0) verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn1) @@ -99,7 +99,7 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar { val toOverwrite = Map(tn0 -> Map(new SystemStreamPartition("test", "foo", p0) -> "42"), tn1 -> Map(new SystemStreamPartition("test", "foo", p1) -> "43")) - val checkpointTool = new CheckpointTool(config, toOverwrite, TestCheckpointTool.checkpointManager) + val checkpointTool = CheckpointTool(config, toOverwrite) checkpointTool.run verify(TestCheckpointTool.checkpointManager) .writeCheckpoint(tn0, new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "42"))) http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala index c00ef91..75ba8af 100644 --- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala @@ -65,7 +65,7 @@ class TestOffsetManager { val config = new MapConfig val checkpointManager = getCheckpointManager(systemStreamPartition, taskName) val systemAdmins = Map("test-system" -> getSystemAdmin) - val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics, checkpointManager.getOffets) + val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics) offsetManager.register(taskName, Set(systemStreamPartition)) offsetManager.start assertTrue(checkpointManager.isStarted) @@ -97,7 +97,7 @@ class TestOffsetManager { val config = new MapConfig val checkpointManager = getCheckpointManager(systemStreamPartition, taskName) val systemAdmins = Map("test-system" -> getSystemAdmin) - val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics, checkpointManager.getOffets) + val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics) offsetManager.register(taskName, Set(systemStreamPartition)) offsetManager.start // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset. @@ -242,17 +242,17 @@ class TestOffsetManager { private def getCheckpointManager(systemStreamPartition: SystemStreamPartition, taskName:TaskName = new TaskName("taskName")) = { val checkpoint = new Checkpoint(Map(systemStreamPartition -> "45")) - new CheckpointManager(null, null, null) { + new CheckpointManager { var isStarted = false var isStopped = false var registered = Set[TaskName]() var checkpoints: Map[TaskName, Checkpoint] = Map(taskName -> checkpoint) var taskNameToPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]() - override def start { isStarted = true } - override def register(taskName: TaskName) { registered += taskName } - override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { checkpoints += taskName -> checkpoint } - override def readLastCheckpoint(taskName: TaskName) = checkpoints.getOrElse(taskName, null) - override def stop { isStopped = true } + def start { isStarted = true } + def register(taskName: TaskName) { registered += taskName } + def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { checkpoints += taskName -> checkpoint } + def readLastCheckpoint(taskName: TaskName) = checkpoints.getOrElse(taskName, null) + def stop { isStopped = true } // Only for testing purposes - not present in actual checkpoint manager def getOffets = Map(taskName -> mapAsScalaMap(checkpoint.getOffsets()).toMap) http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala new file mode 100644 index 0000000..4ca738e --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.checkpoint.file + +import java.io.File +import scala.collection.JavaConversions._ +import java.util.Random +import org.junit.Assert._ +import org.junit.{After, Before, Test} +import org.apache.samza.SamzaException +import org.apache.samza.Partition +import org.apache.samza.checkpoint.Checkpoint +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.container.TaskName +import org.junit.rules.TemporaryFolder + +class TestFileSystemCheckpointManager { + val checkpointRoot = System.getProperty("java.io.tmpdir") // TODO: Move this out of tmp, into our build dir + val taskName = new TaskName("Warwickshire") + val baseFileLocation = new File(checkpointRoot) + + val tempFolder = new TemporaryFolder + + @Before + def createTempFolder = tempFolder.create() + + @After + def deleteTempFolder = tempFolder.delete() + + @Test + def testReadForCheckpointFileThatDoesNotExistShouldReturnNull { + val cpm = new FileSystemCheckpointManager("some-job-name", tempFolder.getRoot) + assertNull(cpm.readLastCheckpoint(taskName)) + } + + @Test + def testReadForCheckpointFileThatDoesExistShouldReturnProperCheckpoint { + val cp = new Checkpoint(Map( + new SystemStreamPartition("a", "b", new Partition(0)) -> "c", + new SystemStreamPartition("a", "c", new Partition(1)) -> "d", + new SystemStreamPartition("b", "d", new Partition(2)) -> "e")) + + var readCp:Checkpoint = null + val cpm = new FileSystemCheckpointManager("some-job-name", tempFolder.getRoot) + + cpm.start + cpm.writeCheckpoint(taskName, cp) + readCp = cpm.readLastCheckpoint(taskName) + cpm.stop + + assertNotNull(readCp) + cp.equals(readCp) + assertEquals(cp.getOffsets.keySet(), readCp.getOffsets.keySet()) + assertEquals(cp.getOffsets, readCp.getOffsets) + assertEquals(cp, readCp) + } + + @Test + def testMissingRootDirectoryShouldFailOnManagerCreation { + val cpm = new FileSystemCheckpointManager("some-job-name", new File(checkpointRoot + new Random().nextInt)) + try { + cpm.start + fail("Expected an exception since root directory for fs checkpoint manager doesn't exist.") + } catch { + case e: SamzaException => None // this is expected + } + cpm.stop + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index a77ddc7..d91b1da 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -54,7 +54,7 @@ import org.scalatest.junit.AssertionsForJUnit import java.lang.Thread.UncaughtExceptionHandler import org.apache.samza.serializers._ import org.apache.samza.SamzaException -import org.apache.samza.checkpoint.CheckpointManager +import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager} class TestSamzaContainer extends AssertionsForJUnit { @Test @@ -63,15 +63,15 @@ class TestSamzaContainer extends AssertionsForJUnit { val offsets = new util.HashMap[SystemStreamPartition, String]() offsets.put(new SystemStreamPartition("system","stream", new Partition(0)), "1") val tasks = Map( - new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets, new Partition(0)), - new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets, new Partition(0))) + new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets.keySet(), new Partition(0)), + new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets.keySet(), new Partition(0))) val containers = Map( Integer.valueOf(0) -> new ContainerModel(0, tasks), Integer.valueOf(1) -> new ContainerModel(1, tasks)) val jobModel = new JobModel(config, containers) def jobModelGenerator(): JobModel = jobModel val server = new HttpServer - val coordinator = new JobCoordinator(jobModel, server, new MockCheckpointManager) + val coordinator = new JobCoordinator(jobModel, server) coordinator.server.addServlet("/*", new JobServlet(jobModelGenerator)) try { coordinator.start @@ -87,12 +87,12 @@ class TestSamzaContainer extends AssertionsForJUnit { val offsets = new util.HashMap[SystemStreamPartition, String]() offsets.put(new SystemStreamPartition("system", "stream", new Partition(0)), "1") val tasksForContainer1 = Map( - new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets, new Partition(0)), - new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets, new Partition(1))) + new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets.keySet(), new Partition(0)), + new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets.keySet(), new Partition(1))) val tasksForContainer2 = Map( - new TaskName("t3") -> new TaskModel(new TaskName("t3"), offsets, new Partition(2)), - new TaskName("t4") -> new TaskModel(new TaskName("t4"), offsets, new Partition(3)), - new TaskName("t5") -> new TaskModel(new TaskName("t6"), offsets, new Partition(4))) + new TaskName("t3") -> new TaskModel(new TaskName("t3"), offsets.keySet(), new Partition(2)), + new TaskName("t4") -> new TaskModel(new TaskName("t4"), offsets.keySet(), new Partition(3)), + new TaskName("t5") -> new TaskModel(new TaskName("t6"), offsets.keySet(), new Partition(4))) val containerModel1 = new ContainerModel(0, tasksForContainer1) val containerModel2 = new ContainerModel(1, tasksForContainer2) val containers = Map( @@ -204,7 +204,13 @@ class TestSamzaContainer extends AssertionsForJUnit { } } -class MockCheckpointManager extends CheckpointManager(null, null, "Unknown") { +class MockCheckpointManager extends CheckpointManager { override def start() = {} override def stop() = {} + + override def register(taskName: TaskName): Unit = {} + + override def readLastCheckpoint(taskName: TaskName): Checkpoint = { new Checkpoint(Map[SystemStreamPartition, String]()) } + + override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint): Unit = { } } http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala index ddf1fde..6e9c6fa 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala @@ -71,6 +71,6 @@ class TestGroupByContainerCount { } private def getTaskModel(name: String, partitionId: Int) = { - new TaskModel(new TaskName(name), Map[SystemStreamPartition, String](), new Partition(partitionId)) + new TaskModel(new TaskName(name), Set[SystemStreamPartition](), new Partition(partitionId)) } } http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala index 1393da8..80cccf3 100644 --- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala @@ -19,19 +19,14 @@ package org.apache.samza.coordinator - -import java.net.SocketTimeoutException - -import org.apache.samza.util.Util import org.junit.{After, Test} import org.junit.Assert._ -import org.junit.rules.ExpectedException import scala.collection.JavaConversions._ import org.apache.samza.config.MapConfig import org.apache.samza.config.TaskConfig import org.apache.samza.config.SystemConfig import org.apache.samza.container.{SamzaContainer, TaskName} -import org.apache.samza.metrics.{MetricsRegistryMap, MetricsRegistry} +import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.config.Config import org.apache.samza.system.SystemFactory import org.apache.samza.system.SystemAdmin @@ -67,20 +62,16 @@ class TestJobCoordinator { // Construct the expected JobModel, so we can compare it to // JobCoordinator's JobModel. val container0Tasks = Map( - task0Name -> new TaskModel(task0Name, checkpoint0, new Partition(4)), - task2Name -> new TaskModel(task2Name, checkpoint2, new Partition(5))) + task0Name -> new TaskModel(task0Name, checkpoint0.keySet, new Partition(4)), + task2Name -> new TaskModel(task2Name, checkpoint2.keySet, new Partition(5))) val container1Tasks = Map( - task1Name -> new TaskModel(task1Name, checkpoint1, new Partition(3))) + task1Name -> new TaskModel(task1Name, checkpoint1.keySet, new Partition(3))) val containers = Map( Integer.valueOf(0) -> new ContainerModel(0, container0Tasks), Integer.valueOf(1) -> new ContainerModel(1, container1Tasks)) // The test does not pass offsets for task2 (Partition 2) to the checkpointmanager, this will verify that we get an offset 0 for this partition - val checkpointOffset0 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" + - task0Name.getTaskName() -> (Util.sspToString(checkpoint0.keySet.iterator.next()) + ":" + checkpoint0.values.iterator.next()) - val checkpointOffset1 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" + - task1Name.getTaskName() -> (Util.sspToString(checkpoint1.keySet.iterator.next()) + ":" + checkpoint1.values.iterator.next()) val changelogInfo0 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task0Name.getTaskName() -> "4" val changelogInfo1 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task1Name.getTaskName() -> "3" val changelogInfo2 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task2Name.getTaskName() -> "5" @@ -88,8 +79,6 @@ class TestJobCoordinator { // Configs which are processed by the MockCoordinatorStream as special configs which are interpreted as // SetCheckpoint and SetChangelog val otherConfigs = Map( - checkpointOffset0, - checkpointOffset1, changelogInfo0, changelogInfo1, changelogInfo2 @@ -116,43 +105,32 @@ class TestJobCoordinator { } @Test - def testJobCoordinatorCheckpointing = { + def testJobCoordinatorChangelogPartitionMapping = { System.out.println("test ") val task0Name = new TaskName("Partition 0") - val checkpoint0 = Map(new SystemStreamPartition("test", "stream1", new Partition(0)) -> "4") + val ssp0 = Set(new SystemStreamPartition("test", "stream1", new Partition(0))) val task1Name = new TaskName("Partition 1") - val checkpoint1 = Map(new SystemStreamPartition("test", "stream1", new Partition(1)) ->"3") + val ssp1 = Set(new SystemStreamPartition("test", "stream1", new Partition(1))) val task2Name = new TaskName("Partition 2") - val checkpoint2 = Map(new SystemStreamPartition("test", "stream1", new Partition(2)) -> "8") + val ssp2 = Set(new SystemStreamPartition("test", "stream1", new Partition(2))) // Construct the expected JobModel, so we can compare it to // JobCoordinator's JobModel. val container0Tasks = Map( - task0Name -> new TaskModel(task0Name, checkpoint0, new Partition(4)), - task2Name -> new TaskModel(task2Name, checkpoint2, new Partition(5))) + task0Name -> new TaskModel(task0Name, ssp0, new Partition(4)), + task2Name -> new TaskModel(task2Name, ssp1, new Partition(5))) val container1Tasks = Map( - task1Name -> new TaskModel(task1Name, checkpoint1, new Partition(3))) + task1Name -> new TaskModel(task1Name, ssp1, new Partition(3))) val containers = Map( Integer.valueOf(0) -> new ContainerModel(0, container0Tasks), Integer.valueOf(1) -> new ContainerModel(1, container1Tasks)) - - // The test does not pass offsets for task2 (Partition 2) to the checkpointmanager, this will verify that we get an offset 0 for this partition - val checkpointOffset0 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" + - task0Name.getTaskName() -> (Util.sspToString(checkpoint0.keySet.iterator.next()) + ":" + checkpoint0.values.iterator.next()) - val checkpointOffset1 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" + - task1Name.getTaskName() -> (Util.sspToString(checkpoint1.keySet.iterator.next()) + ":" + checkpoint1.values.iterator.next()) - val checkpointOffset2 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" + - task2Name.getTaskName() -> (Util.sspToString(checkpoint2.keySet.iterator.next()) + ":" + checkpoint2.values.iterator.next()) val changelogInfo0 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task0Name.getTaskName() -> "4" - val changelogInfo1 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task1Name.getTaskName() -> "3" - val changelogInfo2 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task2Name.getTaskName() -> "5" // Configs which are processed by the MockCoordinatorStream as special configs which are interpreted as // SetCheckpoint and SetChangelog // Write a couple of checkpoints that the job coordinator will process val otherConfigs = Map( - checkpointOffset0, changelogInfo0 ) @@ -175,62 +153,22 @@ class TestJobCoordinator { val url = coordinator.server.getUrl.toString // Verify if the jobCoordinator has seen the checkpoints - var offsets = extractOffsetsFromJobCoordinator(url) - assertEquals(1, offsets.size) - assertEquals(checkpoint0.head._2, offsets.getOrElse(checkpoint0.head._1, fail())) - - // Write more checkpoints - val wrappedConsumer = new MockCoordinatorStreamSystemFactory() - .getConsumer(null, null, null) - .asInstanceOf[MockCoordinatorStreamWrappedConsumer] - - var moreMessageConfigs = Map( - checkpointOffset1 - ) - - wrappedConsumer.addMoreMessages(new MapConfig(moreMessageConfigs)) - - // Verify if the coordinator has seen it - offsets = extractOffsetsFromJobCoordinator(url) - assertEquals(2, offsets.size) - assertEquals(checkpoint0.head._2, offsets.getOrElse(checkpoint0.head._1, fail())) - assertEquals(checkpoint1.head._2, offsets.getOrElse(checkpoint1.head._1, fail())) - - // Write more checkpoints but block on read on the mock consumer - moreMessageConfigs = Map( - checkpointOffset2 - ) - - wrappedConsumer.addMoreMessages(new MapConfig(moreMessageConfigs)) - - // Simulate consumer being blocked (Job coordinator waiting to read new checkpoints from coordinator after container failure) - val latch = wrappedConsumer.blockPool(); - - // verify if the port times out - var seenException = false - try { - extractOffsetsFromJobCoordinator(url) - } - catch { - case se: SocketTimeoutException => seenException = true - } - assertTrue(seenException) + val changelogPartitionMapping = extractChangelogPartitionMapping(url) + assertEquals(3, changelogPartitionMapping.size) + val expectedChangelogPartitionMapping = Map(task0Name -> 4, task1Name -> 5, task2Name -> 6) + assertEquals(expectedChangelogPartitionMapping.get(task0Name), changelogPartitionMapping.get(task0Name)) + assertEquals(expectedChangelogPartitionMapping.get(task1Name), changelogPartitionMapping.get(task1Name)) + assertEquals(expectedChangelogPartitionMapping.get(task2Name), changelogPartitionMapping.get(task2Name)) - // verify if it has read the new checkpoints after job coordinator has loaded the new checkpoints - latch.countDown() - offsets = extractOffsetsFromJobCoordinator(url) - assertEquals(offsets.size, 3) - assertEquals(checkpoint0.head._2, offsets.getOrElse(checkpoint0.head._1, fail())) - assertEquals(checkpoint1.head._2, offsets.getOrElse(checkpoint1.head._1, fail())) - assertEquals(checkpoint2.head._2, offsets.getOrElse(checkpoint2.head._1, fail())) coordinator.stop } - def extractOffsetsFromJobCoordinator(url : String) = { + def extractChangelogPartitionMapping(url : String) = { val jobModel = SamzaContainer.readJobModel(url.toString) val taskModels = jobModel.getContainers.values().flatMap(_.getTasks.values()) - val offsets = taskModels.flatMap(_.getCheckpointedOffsets).toMap - offsets.filter(_._2 != null) + taskModels.map{taskModel => { + taskModel.getTaskName -> taskModel.getChangelogPartition.getPartitionId + }}.toMap } http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala index 9036e81..e97656a 100644 --- a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala +++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala @@ -24,6 +24,7 @@ import java.io.File import org.apache.samza.SamzaException import org.apache.samza.config.Config import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory +import org.apache.samza.migration.JobRunnerMigration import org.junit.Test import org.junit.After import org.junit.Assert._ @@ -51,8 +52,7 @@ class TestJobRunner { "file://%s/src/test/resources/test-migration-fail.properties" format new File(".").getCanonicalPath)) fail("Should have failed already.") } catch { - case se: SamzaException => assertEquals(se.getMessage, "Auto checkpoint migration for 0.10.0 upgrade is only supported for Kafka checkpointing system, " + - "for everything else, please use the checkpoint tool and remove task.checkpoint.factory configuration") + case se: SamzaException => assertEquals(se.getMessage, JobRunnerMigration.UNSUPPORTED_ERROR_MSG) } } http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala index a1efe6f..3a710a8 100644 --- a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala +++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala @@ -56,7 +56,7 @@ class TestProcessJob { } } -class MockJobCoordinator extends JobCoordinator(null, null, null) { +class MockJobCoordinator extends JobCoordinator(null, null) { var stopped: Boolean = false override def start: Unit = { }
