Repository: samza Updated Branches: refs/heads/master 60f529379 -> 220cfce5c
SAMZA-978: fix next links in online doc Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/220cfce5 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/220cfce5 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/220cfce5 Branch: refs/heads/master Commit: 220cfce5c11a4e77df3db906ed68bb8efea624ce Parents: 60f5293 Author: Fred Ji <[email protected]> Authored: Mon Aug 1 11:40:09 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Mon Aug 1 11:40:09 2016 -0700 ---------------------------------------------------------------------- docs/learn/documentation/versioned/api/overview.md | 1 + .../documentation/versioned/comparisons/mupd8.md | 2 +- .../documentation/versioned/comparisons/storm.md | 2 +- .../versioned/container/coordinator-stream.md | 6 ++++-- .../documentation/versioned/container/event-loop.md | 2 +- .../documentation/versioned/container/metrics.md | 2 +- .../versioned/container/samza-container.md | 6 +++--- .../versioned/container/state-management.md | 8 ++++---- .../documentation/versioned/container/windowing.md | 2 +- docs/learn/documentation/versioned/hdfs/producer.md | 15 +++++++-------- docs/learn/documentation/versioned/index.html | 2 -- docs/learn/documentation/versioned/yarn/isolation.md | 6 +++--- .../versioned/yarn/yarn-host-affinity.md | 5 +++-- 13 files changed, 30 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/220cfce5/docs/learn/documentation/versioned/api/overview.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/api/overview.md b/docs/learn/documentation/versioned/api/overview.md index 6712344..67e259c 100644 --- a/docs/learn/documentation/versioned/api/overview.md +++ b/docs/learn/documentation/versioned/api/overview.md @@ -139,4 +139,5 @@ public class SplitStringIntoWords implements StreamTask { } {% endhighlight %} +For more details on APIs, please refer to [Configuration](../jobs/configuration-table.html) and [Javadocs](javadocs) ## [SamzaContainer »](../container/samza-container.html) http://git-wip-us.apache.org/repos/asf/samza/blob/220cfce5/docs/learn/documentation/versioned/comparisons/mupd8.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/comparisons/mupd8.md b/docs/learn/documentation/versioned/comparisons/mupd8.md index 53417f9..b6def6a 100644 --- a/docs/learn/documentation/versioned/comparisons/mupd8.md +++ b/docs/learn/documentation/versioned/comparisons/mupd8.md @@ -29,7 +29,7 @@ MUPD8 makes no durability or delivery guarantees. Within MUPD8, stream processor As with durability, developers would ideally like their stream processors to receive messages in exactly the order that they were written. -We don't entirely follow MUPD8's description of their ordering guarantees, but it seems to guarantee that all messages will be processed in the order in which they are written to MUPD8 queues, which is comparable to Kafka and Samza's guarantee. +Based on the understanding of MUPD8's description of their ordering guarantees, it guarantees that all messages will be processed in the order in which they are written to MUPD8 queues, which is comparable to Kafka and Samza's guarantee. ### Buffering http://git-wip-us.apache.org/repos/asf/samza/blob/220cfce5/docs/learn/documentation/versioned/comparisons/storm.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/comparisons/storm.md b/docs/learn/documentation/versioned/comparisons/storm.md index 58cb508..080b977 100644 --- a/docs/learn/documentation/versioned/comparisons/storm.md +++ b/docs/learn/documentation/versioned/comparisons/storm.md @@ -23,7 +23,7 @@ title: Storm [Storm](http://storm-project.net/) and Samza are fairly similar. Both systems provide many of the same high-level features: a partitioned stream model, a distributed execution environment, an API for stream processing, fault tolerance, Kafka integration, etc. -Storm and Samza use different words for similar concepts: *spouts* in Storm are similar to stream consumers in Samza, *bolts* are similar to tasks, and *tuples* are similar to messages in Samza. Storm also has some additional building blocks which don't have direct equivalents in Samza. +Storm and Samza use different words for similar concepts: *spouts* in Storm are similar to stream consumers in Samza, *bolts* are similar to tasks, and *tuples* are similar to messages in Samza. Some additional building blocks, such as *trident*, *topology*, etc., don't have direct equivalents in Samza. ### Ordering and Guarantees http://git-wip-us.apache.org/repos/asf/samza/blob/220cfce5/docs/learn/documentation/versioned/container/coordinator-stream.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/container/coordinator-stream.md b/docs/learn/documentation/versioned/container/coordinator-stream.md index 98c1a84..b680593 100644 --- a/docs/learn/documentation/versioned/container/coordinator-stream.md +++ b/docs/learn/documentation/versioned/container/coordinator-stream.md @@ -51,7 +51,7 @@ message => { } ``` -The messages are essentially serialized and transmitted over the wire as JSON blobs. Hence, for serialization to work correctly, it is very important to not have any unnecessary white spaces. The white spaces in the above JSON blob have been shown for legibility only. +The messages are essentially serialized and transmitted over the wire as JSON blobs. Hence, for serialization to work correctly, it is very important to not have any unnecessary white spaces. The white spaces in the above JSON blob have been shown for legibility only. The most important fields are type, key, and values: @@ -134,7 +134,7 @@ Thus, Job Coorindator is the single component that has the latest view of the en ### Job Coordinator Availability -The Job Coordinator resides in the same container as the Samza Application Master. Thus, the availability of the Job Coordinator is tied to the availability of the Application Master (AM) in the Yarn cluster. The Samza containers are started only after initializing the Job Coordinator from the Coordinator Stream. In stable condition, when the Samza container comes up, it should be able to read the JobModel from the Job Coordinator without timing out. +The Job Coordinator resides in the same container as the Samza Application Master. Thus, the availability of the Job Coordinator is tied to the availability of the Application Master (AM) in the Yarn cluster. The Samza containers are started only after initializing the Job Coordinator from the Coordinator Stream. In stable condition, when the Samza container comes up, it should be able to read the JobModel from the Job Coordinator without timing out. ## Benefits of Coordinator Stream Model Writing the configuration to a durable stream opens the door for Samza to do a couple of things: @@ -148,3 +148,5 @@ Writing the configuration to a durable stream opens the door for Samza to do a c For other interesting features that can leverage this model, please refer the [design document](https://issues.apache.org/jira/secure/attachment/12670650/DESIGN-SAMZA-348-1.pdf). + +## [Event Loop »](event-loop.html) http://git-wip-us.apache.org/repos/asf/samza/blob/220cfce5/docs/learn/documentation/versioned/container/event-loop.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/container/event-loop.md b/docs/learn/documentation/versioned/container/event-loop.md index 1162383..9dcf92c 100644 --- a/docs/learn/documentation/versioned/container/event-loop.md +++ b/docs/learn/documentation/versioned/container/event-loop.md @@ -45,4 +45,4 @@ The only way in which a developer can hook into a SamzaContainer's lifecycle is A concrete example is a set of StreamTasks that all want to share the same try/catch logic in their process() method. A StreamTask can be implemented that wraps the original StreamTasks, and surrounds the original process() call with the appropriate try/catch logic. For more details, see [this discussion](https://issues.apache.org/jira/browse/SAMZA-437). -## [JMX »](jmx.html) +## [Metrics »](metrics.html) http://git-wip-us.apache.org/repos/asf/samza/blob/220cfce5/docs/learn/documentation/versioned/container/metrics.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/container/metrics.md b/docs/learn/documentation/versioned/container/metrics.md index 0423155..b053b79 100644 --- a/docs/learn/documentation/versioned/container/metrics.md +++ b/docs/learn/documentation/versioned/container/metrics.md @@ -99,4 +99,4 @@ Samza currently supports three kinds of metrics: [counters](../api/javadocs/org/ If you want to report metrics in some other way, e.g. directly to a graphing system (without going via Kafka), you can implement a [MetricsReporterFactory](../api/javadocs/org/apache/samza/metrics/MetricsReporterFactory.html) and reference it in your job configuration. -## [Windowing »](windowing.html) +## [JMX »](jmx.html) http://git-wip-us.apache.org/repos/asf/samza/blob/220cfce5/docs/learn/documentation/versioned/container/samza-container.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/container/samza-container.md b/docs/learn/documentation/versioned/container/samza-container.md index 55bec98..fb91418 100644 --- a/docs/learn/documentation/versioned/container/samza-container.md +++ b/docs/learn/documentation/versioned/container/samza-container.md @@ -104,12 +104,12 @@ There is one caveat in all of this: Samza currently assumes that a stream's part ### Broadcast Streams -After 0.10.0, Samza supports broadcast streams. You can assign partitions from some streams to all the tasks. For example, you want all the tasks can consume partition 0 and 1 from a stream called global-stream-1, and partition 2 from a stream called global-stream-2. You now can configure: +After 0.10.0, Samza supports broadcast streams. You can assign partitions from some streams to all the tasks, by appending the hash tag, and the partition number or the partition number range. For example, you want all the tasks can consume partition 0 and 1 from a stream called broadcast-stream-1, and partition 2 from a stream called broadcast-stream-2. You now can configure: {% highlight jproperties %} -task.broadcast.inputs=yourSystem.global-stream-1#[0-1], yourSystem.global-stream-2#2 +task.broadcast.inputs=yourSystem.broadcast-stream-1#[0-1], yourSystem.broadcast-stream-2#2 {% endhighlight %} -If you use "[]", you are specifying a range. +If you use "[]", you are specifying a range for partitions. ## [Streams »](streams.html) http://git-wip-us.apache.org/repos/asf/samza/blob/220cfce5/docs/learn/documentation/versioned/container/state-management.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/container/state-management.md b/docs/learn/documentation/versioned/container/state-management.md index 320392c..86b0d44 100644 --- a/docs/learn/documentation/versioned/container/state-management.md +++ b/docs/learn/documentation/versioned/container/state-management.md @@ -209,7 +209,7 @@ samza-example/target/bin/read-rocksdb-tool.sh \ {% endhighlight %} * `--config-path`(required): your job's configuration file -* `--db-path`(required): the location of your RocksDB. This is convenient if the RocksDB is in the same machine as the tool. E.g. if you are running hello-samza in your local machine, the location maybe in +* `--db-path`(required): the location of your RocksDB. This is convenient if the RocksDB is in the same machine as the tool. E.g. if you are running hello-samza in your local machine, the location maybe in _/tmp/hadoop/nm-local-dir/usercache/username/appcache/applicationId/containerId/state/storeName/PartitionNumber_ * `--db-name`(required): if you only have one state store specified in the config file, you can ignore this one. Otherwise, you need to provide the state store name here. * `--string-key`: the key list. This one only works if your keys are string. There are also another two options: `--integer-key`, `--long-key`. They work for integer keys and long keys respectively. @@ -273,9 +273,9 @@ Implementation: Partition the ad click and ad impression streams by the impressi ### Other storage engines -Samza's fault-tolerance mechanism (sending a local store's writes to a replicated changelog) is completely decoupled from the storage engine's data structures and query APIs. While a key-value storage engine is good for general-purpose processing, you can easily add your own storage engines for other types of queries by implementing the [StorageEngine](../api/javadocs/org/apache/samza/storage/StorageEngine.html) interface. Samza's model is especially amenable to embedded storage engines, which run as a library in the same process as the stream task. +Samza's fault-tolerance mechanism (sending a local store's writes to a replicated changelog) is completely decoupled from the storage engine's data structures and query APIs. While a key-value storage engine is good for general-purpose processing, you can easily add your own storage engines for other types of queries by implementing the [StorageEngine](../api/javadocs/org/apache/samza/storage/StorageEngine.html) interface. Samza's model is especially amenable to embedded storage engines, which run as a library in the same process as the stream task. -Some ideas for other storage engines that could be useful: a persistent heap (for running top-N queries), [approximate algorithms](http://infolab.stanford.edu/~ullman/mmds/ch4.pdf) such as [bloom filters](http://en.wikipedia.org/wiki/Bloom_filter) and [hyperloglog](http://research.google.com/pubs/pub40671.html), or full-text indexes such as [Lucene](http://lucene.apache.org). (Patches accepted!) +Some ideas for other storage engines that could be useful: a persistent heap (for running top-N queries), [approximate algorithms](http://infolab.stanford.edu/~ullman/mmds/ch4.pdf) such as [bloom filters](http://en.wikipedia.org/wiki/Bloom_filter) and [hyperloglog](http://research.google.com/pubs/pub40671.html), or full-text indexes such as [Lucene](http://lucene.apache.org). (Patches welcome!) ### Fault tolerance semantics with state @@ -285,4 +285,4 @@ For many of the stateful processing use cases discussed above, this is not a pro However, for non-idempotent operations such as counting, at-least-once delivery guarantees can give incorrect results. If a Samza task fails and is restarted, it may double-count some messages that were processed shortly before the failure. We are planning to address this limitation in a future release of Samza. -## [Metrics »](metrics.html) +## [Windowing »](windowing.html) http://git-wip-us.apache.org/repos/asf/samza/blob/220cfce5/docs/learn/documentation/versioned/container/windowing.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/container/windowing.md b/docs/learn/documentation/versioned/container/windowing.md index b10e5d4..319062a 100644 --- a/docs/learn/documentation/versioned/container/windowing.md +++ b/docs/learn/documentation/versioned/container/windowing.md @@ -58,4 +58,4 @@ If you need to send messages to output streams, you can use the [MessageCollecto Note that Samza uses [single-threaded execution](event-loop.html), so the window() call can never happen concurrently with a process() call. This has the advantage that you don't need to worry about thread safety in your code (no need to synchronize anything), but the downside that the window() call may be delayed if your process() method takes a long time to return. -## [Event Loop »](event-loop.html) +## [Coordinator Stream »](coordinator-stream.html) http://git-wip-us.apache.org/repos/asf/samza/blob/220cfce5/docs/learn/documentation/versioned/hdfs/producer.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/hdfs/producer.md b/docs/learn/documentation/versioned/hdfs/producer.md index 0865a35..b0e936f 100644 --- a/docs/learn/documentation/versioned/hdfs/producer.md +++ b/docs/learn/documentation/versioned/hdfs/producer.md @@ -1,6 +1,6 @@ --- layout: page -title: Isolation +title: Writing to HDFS --- <!-- Licensed to the Apache Software Foundation (ASF) under one or more @@ -19,10 +19,8 @@ title: Isolation limitations under the License. --> -### Writing to HDFS from Samza - -The `samza-hdfs` module implements a Samza Producer to write to HDFS. The current implementation includes a ready-to-use `HdfsSystemProducer`, and three `HdfsWriter`s: One that writes messages of raw bytes to a `SequenceFile` of `BytesWritable` keys and values. Another writes UTF-8 `String`s to a `SequenceFile` with `LongWritable` keys and `Text` values. -The last one writes out Avro data files including the schema automatically reflected from the POJO objects fed to it. +The `samza-hdfs` module implements a Samza Producer to write to HDFS. The current implementation includes a ready-to-use `HdfsSystemProducer`, and three `HdfsWriter`s: One that writes messages of raw bytes to a `SequenceFile` of `BytesWritable` keys and values. Another writes UTF-8 `String`s to a `SequenceFile` with `LongWritable` keys and `Text` values. +The last one writes out Avro data files including the schema automatically reflected from the POJO objects fed to it. ### Configuring an HdfsSystemProducer @@ -34,10 +32,10 @@ You might configure the system producer for use by your `StreamTasks` like this: systems.hdfs-clickstream.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory # define a serializer/deserializer for the hdfs-clickstream system -# DO NOT define (i.e. comment out) a SerDe when using the AvroDataFileHdfsWriter so it can reflect the schema +# DO NOT define (i.e. comment out) a SerDe when using the AvroDataFileHdfsWriter so it can reflect the schema systems.hdfs-clickstream.samza.msg.serde=some-serde-impl -# consumer configs not needed for HDFS system, reader is not implemented yet +# consumer configs not needed for HDFS system, reader is not implemented yet # Assign a Metrics implementation via a label we defined earlier in the props file systems.hdfs-clickstream.streams.metrics.samza.msg.serde=some-metrics-impl @@ -60,7 +58,7 @@ systems.hdfs-clickstream.producer.hdfs.bucketer.class=org.apache.samza.system.hd # Configure the DATE_PATH the Bucketer will set to bucket output files by day for this job run. systems.hdfs-clickstream.producer.hdfs.bucketer.date.path.format=yyyy_MM_dd -# Optionally set the max output bytes (records for AvroDataFileHdfsWriter) per file. +# Optionally set the max output bytes (records for AvroDataFileHdfsWriter) per file. # A new file will be cut and output continued on the next write call each time this many bytes # (records for AvroDataFileHdfsWriter) are written. systems.hdfs-clickstream.producer.hdfs.write.batch.size.bytes=134217728 @@ -69,3 +67,4 @@ systems.hdfs-clickstream.producer.hdfs.write.batch.size.bytes=134217728 The above configuration assumes a Metrics and Serde implemnetation has been properly configured against the `some-serde-impl` and `some-metrics-impl` labels somewhere else in the same `job.properties` file. Each of these properties has a reasonable default, so you can leave out the ones you don't need to customize for your job run. +## [Security »](../operations/security.html) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/220cfce5/docs/learn/documentation/versioned/index.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/index.html b/docs/learn/documentation/versioned/index.html index 1e79bd6..84e15a2 100644 --- a/docs/learn/documentation/versioned/index.html +++ b/docs/learn/documentation/versioned/index.html @@ -45,8 +45,6 @@ title: Documentation <ul class="documentation-list"> <li><a href="api/overview.html">Overview</a></li> - <li><a href="jobs/configuration-table.html">Configuration</a></li> - <li><a href="api/javadocs">Javadocs</a></li> </ul> <h4>Core</h4> http://git-wip-us.apache.org/repos/asf/samza/blob/220cfce5/docs/learn/documentation/versioned/yarn/isolation.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/yarn/isolation.md b/docs/learn/documentation/versioned/yarn/isolation.md index a76f6fd..008a640 100644 --- a/docs/learn/documentation/versioned/yarn/isolation.md +++ b/docs/learn/documentation/versioned/yarn/isolation.md @@ -37,10 +37,10 @@ YARN has the concept of a virtual core. Each NM is assigned a total number of vi #### CGroups -Unlike memory, which YARN can enforce itself (by looking at the /proc folder), YARN can't enforce CPU isolation, since this must be done at the Linux kernel level. One of YARN's interesting new features is its support for Linux [CGroups](https://www.kernel.org/doc/Documentation/cgroups/cgroups.txt). CGroups are a way to control process utilization at the kernel level in Linux. +Unlike memory, which YARN can enforce itself (by looking at the /proc folder), YARN can't enforce CPU isolation, since this must be done at the Linux kernel level. One of YARN's interesting new features is its support for Linux [CGroups](https://www.kernel.org/doc/Documentation/cgroup-v1/cgroups.txt). CGroups are a way to control process utilization at the kernel level in Linux. -If YARN is setup to use CGroups, then YARN will guarantee that a container will get at least the amount of CPU that it requires. Currently, YARN will give you more CPU, if it's available. For details on enforcing "at most" CPU usage, see [YARN-810](https://issues.apache.org/jira/browse/YARN-810). +If YARN is setup to use CGroups, then YARN will guarantee that a container will get at least the amount of CPU that it requires. Currently, YARN will give you more CPU, if it's available. For details on enforcing "at most" CPU usage, see [YARN-810](https://issues.apache.org/jira/browse/YARN-810). See [this blog post](http://riccomini.name/posts/hadoop/2013-06-14-yarn-with-cgroups/) for details on setting up YARN with CGroups. -## [YARN Security »](../yarn/yarn-secuirty.html) +## [Host Affinity & YARN »](../yarn/yarn-host-affinity.html) http://git-wip-us.apache.org/repos/asf/samza/blob/220cfce5/docs/learn/documentation/versioned/yarn/yarn-host-affinity.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/yarn/yarn-host-affinity.md b/docs/learn/documentation/versioned/yarn/yarn-host-affinity.md index af6367d..13bc9b9 100644 --- a/docs/learn/documentation/versioned/yarn/yarn-host-affinity.md +++ b/docs/learn/documentation/versioned/yarn/yarn-host-affinity.md @@ -19,7 +19,7 @@ title: Host Affinity & YARN limitations under the License. --> -In Samza, containers are the units of physical parallelism that runs on a set of machines. Each container is essentially a process that runs one or more stream tasks. Each task instance consumes one or more partitions of the input streams and is associated with its own durable data store. +In Samza, containers are the units of physical parallelism that runs on a set of machines. Each container is essentially a process that runs one or more stream tasks. Each task instance consumes one or more partitions of the input streams and is associated with its own durable data store. We define a *Stateful Samza Job* as the Samza job that uses a key-value store in its implementation, along with an associated changelog stream. In stateful samza jobs, a task may be configured to use multiple stores. For each store there is a 1:1 mapping between the task instance and the data store. Since the allocation of containers to machines in the Yarn cluster is completely left to Yarn, Samza does not guarantee that a container (and hence, its associated task(s)) gets deployed on the same machine. Containers can get shuffled in any of the following cases: @@ -70,7 +70,7 @@ Note that the Yarn cluster has to be configured to use [Fair Scheduler](https:// ## Configuring YARN cluster to support Host Affinity -1. Enable local state re-use by setting the <code>LOGGED\_STORE\_BASE\_DIR</code> environment variable in yarn-env.sh {% highlight bash %} +1. Enable local state re-use by setting the <code>LOGGED\_STORE\_BASE\_DIR</code> environment variable in yarn-env.sh {% highlight bash %} export LOGGED_STORE_BASE_DIR=<path-for-state-stores> {% endhighlight %} Without this configuration, the state stores are not persisted upon a container shutdown. This will effectively mean you will not re-use local state and hence, host-affinity becomes a moot operation. 2. Configure Yarn to use Fair Scheduler and enable continuous-scheduling in yarn-site.xml {% highlight xml %} @@ -119,3 +119,4 @@ As you have observed, host-affinity cannot be guaranteed all the time due to var 1. _When the number of containers and/or container-task assignment changes across successive application runs_ - We may be able to re-use local state for a subset of partitions. Currently, there is no logic in the Job Coordinator to handle partitioning of tasks among containers intelligently. Handling this is more involved as relates to [auto-scaling](https://issues.apache.org/jira/browse/SAMZA-336) of the containers. However, with [task-container mapping](https://issues.apache.org/jira/browse/SAMZA-906), this will work better for typical container count adjustments. 2. _When SystemStreamPartitionGrouper changes across successive application runs_ - When the grouper logic used to distribute the partitions across containers changes, the data in the Coordinator Stream (for changelog-task partition assignment etc) and the data stores becomes invalid. Thus, to be safe, we should flush out all state-related data from the Coordinator Stream. An alternative is to overwrite the Task-ChangelogPartition assignment message and the Container Locality message in the Coordinator Stream, before starting up the job again. +## [Writing to HDFS »](../hdfs/producer.html) \ No newline at end of file
