Updated Branches: refs/heads/master 9af91314e -> fe8df6373
SAMZA-131: Added further Javadoc for various API classes/interfaces Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/f63ce72a Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/f63ce72a Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/f63ce72a Branch: refs/heads/master Commit: f63ce72ade2390eec091ade5f77e22eb768733d2 Parents: 21c07c8 Author: Jonathan Poltak Samosir <jonathan dot samosir at gmail dot com> Authored: Tue Jan 21 15:22:30 2014 -0800 Committer: Jakob Homan <[email protected]> Committed: Tue Jan 21 15:22:30 2014 -0800 ---------------------------------------------------------------------- .../main/java/org/apache/samza/Partition.java | 7 +++++ .../org/apache/samza/checkpoint/Checkpoint.java | 11 +++++++ .../samza/checkpoint/CheckpointManager.java | 17 +++++++++++ .../apache/samza/serializers/Deserializer.java | 10 +++++++ .../apache/samza/serializers/Serializer.java | 9 ++++++ .../samza/storage/StorageEngineFactory.java | 22 ++++++-------- .../samza/system/OutgoingMessageEnvelope.java | 31 ++++++++++++++++++++ .../org/apache/samza/system/SystemConsumer.java | 10 +++++++ .../org/apache/samza/system/SystemProducer.java | 12 ++++++++ .../org/apache/samza/system/SystemStream.java | 12 ++++++++ .../samza/system/SystemStreamPartition.java | 2 +- .../java/org/apache/samza/task/StreamTask.java | 6 ++-- 12 files changed, 133 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f63ce72a/samza-api/src/main/java/org/apache/samza/Partition.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/Partition.java b/samza-api/src/main/java/org/apache/samza/Partition.java index 51b7e38..ebb77ed 100644 --- a/samza-api/src/main/java/org/apache/samza/Partition.java +++ b/samza-api/src/main/java/org/apache/samza/Partition.java @@ -19,9 +19,16 @@ package org.apache.samza; +/** + * Used to represent a Samza stream partition. + */ public class Partition { private final int partition; + /** + * Constructs a new Samza stream partition from a specified partition number. + * @param partition An integer identifying the partition in a Samza stream. + */ public Partition(int partition) { this.partition = partition; } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f63ce72a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java b/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java index 22fa582..dcf81bf 100644 --- a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java +++ b/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java @@ -24,13 +24,24 @@ import java.util.Map; import org.apache.samza.system.SystemStream; +/** + * Used to represent a checkpoint in the running of a Samza system. + */ public class Checkpoint { private final Map<SystemStream, String> offsets; + /** + * Constructs a new checkpoint based off a map of Samza stream offsets. + * @param offsets Map of Samza streams to their current offset. + */ public Checkpoint(Map<SystemStream, String> offsets) { this.offsets = offsets; } + /** + * Gets a unmodifiable view of the current Samza stream offsets. + * @return A unmodifiable view of a Map of Samza streams to their recorded offsets. + */ public Map<SystemStream, String> getOffsets() { return Collections.unmodifiableMap(offsets); } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f63ce72a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java index 2890de5..34f50fd 100644 --- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java +++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java @@ -21,13 +21,30 @@ package org.apache.samza.checkpoint; import org.apache.samza.Partition; +/** + * Used as a standard interface for writing out checkpoints for a specified partition. + */ public interface CheckpointManager { public void start(); + /** + * Registers this manager to write checkpoints of a specific Samza stream partition. + * @param partition Specific Samza stream partition of which to write checkpoints for. + */ public void register(Partition partition); + /** + * Writes a checkpoint based on the current state of a Samza stream partition. + * @param partition Specific Samza stream partition of which to write a checkpoint of. + * @param checkpoint Reference to a Checkpoint object to store offset data in. + */ public void writeCheckpoint(Partition partition, Checkpoint checkpoint); + /** + * Returns the last recorded checkpoint for a specified Samza stream partition. + * @param partition Specific Samza stream partition for which to get the last checkpoint of. + * @return A Checkpoint object with the recorded offset data of the specified partition. + */ public Checkpoint readLastCheckpoint(Partition partition); public void stop(); http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f63ce72a/samza-api/src/main/java/org/apache/samza/serializers/Deserializer.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/Deserializer.java b/samza-api/src/main/java/org/apache/samza/serializers/Deserializer.java index c6264b9..fe72223 100644 --- a/samza-api/src/main/java/org/apache/samza/serializers/Deserializer.java +++ b/samza-api/src/main/java/org/apache/samza/serializers/Deserializer.java @@ -19,6 +19,16 @@ package org.apache.samza.serializers; +/** + * A standard interface for Samza compatible deserializers, used for deserializing serialized objects back to their + * original form. + * @param <T> The type of serialized object this deserializer should be implemented to deserialize. + */ public interface Deserializer<T> { + /** + * Deserializes given serialized object from an array of bytes to its original form. + * @param bytes Array of bytes representing serialized object. + * @return Original deserialized object. + */ T fromBytes(byte[] bytes); } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f63ce72a/samza-api/src/main/java/org/apache/samza/serializers/Serializer.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/Serializer.java b/samza-api/src/main/java/org/apache/samza/serializers/Serializer.java index 1217698..932e9a5 100644 --- a/samza-api/src/main/java/org/apache/samza/serializers/Serializer.java +++ b/samza-api/src/main/java/org/apache/samza/serializers/Serializer.java @@ -19,6 +19,15 @@ package org.apache.samza.serializers; +/** + * A standard interface for Samza compatible serializers, used for serializing objects to bytes. + * @param <T> The type of object this serializer should be implemented to serialize. + */ public interface Serializer<T> { + /** + * Serializes given object to an array of bytes. + * @param object Object of specific type to serialize. + * @return An array of bytes representing the object in serialized form. + */ byte[] toBytes(T object); } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f63ce72a/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java index d7a1b90..fe83c8e 100644 --- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java +++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java @@ -33,21 +33,17 @@ import org.apache.samza.task.MessageCollector; */ public interface StorageEngineFactory<K, V> { - // TODO update javadocs for storage engine factory + // TODO add Javadocs for MetricsRegistry and MessageCollector args /** - * Create an instance of the given storage engine - * - * @param dir - * The directory - * @param config - * The configuration for the task - * @param stream - * The stream used to back this storage engine - * @param serde - * The serializer to use for serializing keys and values when reading - * or writing to the store store - * @return The storage engine instance + * Create an instance of the given storage engine. + * @param storeName The name of the storage engine. + * @param storeDir The directory of the storage engine. + * @param keySerde The serializer to use for serializing keys when reading or writing to the store. + * @param msgSerde The serializer to use for serializing messages when reading or writing to the store. + * @param config The configuration for the task. + * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog from. + * @return The storage engine instance. */ public StorageEngine getStorageEngine( String storeName, http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f63ce72a/samza-api/src/main/java/org/apache/samza/system/OutgoingMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/OutgoingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/OutgoingMessageEnvelope.java index 9cbb254..c8ef980 100644 --- a/samza-api/src/main/java/org/apache/samza/system/OutgoingMessageEnvelope.java +++ b/samza-api/src/main/java/org/apache/samza/system/OutgoingMessageEnvelope.java @@ -19,6 +19,10 @@ package org.apache.samza.system; +/** + * This class represents a message envelope that is sent by a StreamTask. It can be thought of as a complement to the + * IncomingMessageEnvelope class. + */ public class OutgoingMessageEnvelope { private final SystemStream systemStream; private final String keySerializerName; @@ -27,6 +31,15 @@ public class OutgoingMessageEnvelope { private final Object key; private final Object message; + /** + * Constructs a new OutgoingMessageEnvelope from specified components. + * @param systemStream Object representing the appropriate stream of which this envelope will be sent on. + * @param keySerializerName String representing the serializer used for serializing this envelope's key. + * @param messageSerializerName String representing the serializer used for serializing this envelope's message. + * @param partitionKey A key representing which partition of the systemStream to send this envelope on. + * @param key A deserialized key to be used for the message. + * @param message A deserialized message to be sent in this envelope. + */ public OutgoingMessageEnvelope(SystemStream systemStream, String keySerializerName, String messageSerializerName, Object partitionKey, Object key, Object message) { this.systemStream = systemStream; this.keySerializerName = keySerializerName; @@ -36,14 +49,32 @@ public class OutgoingMessageEnvelope { this.message = message; } + /** + * Constructs a new OutgoingMessageEnvelope from specified components. + * @param systemStream Object representing the appropriate stream of which this envelope will be sent on. + * @param partitionKey A key representing which partition of the systemStream to send this envelope on. + * @param key A deserialized key to be used for the message. + * @param message A deserialized message to be sent in this envelope. + */ public OutgoingMessageEnvelope(SystemStream systemStream, Object partitionKey, Object key, Object message) { this(systemStream, null, null, partitionKey, key, message); } + /** + * Constructs a new OutgoingMessageEnvelope from specified components. + * @param systemStream Object representing the appropriate stream of which this envelope will be sent on. + * @param key A deserialized key to be used for the message. + * @param message A deserialized message to be sent in this envelope. + */ public OutgoingMessageEnvelope(SystemStream systemStream, Object key, Object message) { this(systemStream, null, null, key, key, message); } + /** + * Constructs a new OutgoingMessageEnvelope from specified components. + * @param systemStream Object representing the appropriate stream of which this envelope will be sent on. + * @param message A deserialized message to be sent in this envelope. + */ public OutgoingMessageEnvelope(SystemStream systemStream, Object message) { this(systemStream, null, null, null, null, message); } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f63ce72a/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java b/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java index b182f79..811a9f4 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java @@ -22,6 +22,9 @@ package org.apache.samza.system; import java.util.List; import java.util.Map; +/** + * Used as a standard interface for all consumers of messages from specific Samza stream partitions. + */ public interface SystemConsumer { public static int BLOCK_ON_OUTSTANDING_MESSAGES = -1; @@ -29,6 +32,13 @@ public interface SystemConsumer { void stop(); + /** + * Registers this consumer to receive messages from a specific offset in a Samza stream partition. + * @param systemStreamPartition The SystemStreamPartition object representing the Samza stream partition to receive + * messages from. + * @param lastReadOffset String representing the offset from which to start receiving messages from in the specified + * partition. + */ void register(SystemStreamPartition systemStreamPartition, String lastReadOffset); List<IncomingMessageEnvelope> poll(Map<SystemStreamPartition, Integer> systemStreamPartitions, long timeout) throws InterruptedException; http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f63ce72a/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java b/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java index bba53d0..8967f57 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java @@ -19,13 +19,25 @@ package org.apache.samza.system; +/** + * Used as a standard interface for all producers of messages from a specified Samza source. + */ public interface SystemProducer { void start(); void stop(); + /** + * Registers this producer to send messages from a specified Samza source, such as a StreamTask. + * @param source String representing the source of the message. + */ void register(String source); + /** + * Sends a specified message envelope from a specified Samza source. + * @param source String representing the source of the message. + * @param envelope Aggregate object representing the serialized message to send from the source. + */ void send(String source, OutgoingMessageEnvelope envelope); void flush(String source); http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f63ce72a/samza-api/src/main/java/org/apache/samza/system/SystemStream.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStream.java b/samza-api/src/main/java/org/apache/samza/system/SystemStream.java index 1de9317..0265a2c 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemStream.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemStream.java @@ -19,15 +19,27 @@ package org.apache.samza.system; +/** + * Used to represent a Samza stream. + */ public class SystemStream { protected final String system; protected final String stream; + /** + * Constructs a Samza stream object from specified components. + * @param system The name of the system of which this stream is associated with. + * @param stream The name of the stream as specified in the stream configuration file. + */ public SystemStream(String system, String stream) { this.system = system; this.stream = stream; } + /** + * Constructs a Samza stream object based upon an existing Samza stream. + * @param other Reference to an already existing Samza stream. + */ public SystemStream(SystemStream other) { this(other.getSystem(), other.getStream()); } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f63ce72a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java index 6b7ef5e..5173ebd 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java @@ -41,7 +41,7 @@ public class SystemStreamPartition extends SystemStream { } /** - * Constructs a Samza stream partition object based upon another Samza stream partition. + * Constructs a Samza stream partition object based upon an existing Samza stream partition. * @param other Reference to an already existing Samza stream partition. */ public SystemStreamPartition(SystemStreamPartition other) { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f63ce72a/samza-api/src/main/java/org/apache/samza/task/StreamTask.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/StreamTask.java b/samza-api/src/main/java/org/apache/samza/task/StreamTask.java index d552514..3d87fbb 100644 --- a/samza-api/src/main/java/org/apache/samza/task/StreamTask.java +++ b/samza-api/src/main/java/org/apache/samza/task/StreamTask.java @@ -22,12 +22,14 @@ package org.apache.samza.task; import org.apache.samza.system.IncomingMessageEnvelope; /** - * Used as a standard interface for all user processing tasks. Receives messages from a partition of a specified input stream. + * Used as a standard interface for all user processing tasks. Receives messages from a partition of a specified input + * stream. */ public interface StreamTask { /** * Called once for each message that this StreamTask receives. - * @param envelope Contains the received deserialized message and key, and also information regarding the stream and partition of which the message was received from. + * @param envelope Contains the received deserialized message and key, and also information regarding the stream and + * partition of which the message was received from. * @param collector Contains the means of sending message envelopes to the output stream. * @param coordinator Manages execution of tasks. * @throws Exception Any exception types encountered during the execution of the processing task.
