SAMZA-820 - Separate checkpoint stream and coordinator stream configurations 
and update documentation


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/48220501
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/48220501
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/48220501

Branch: refs/heads/samza-sql
Commit: 4822050134e00ab9a48f64395bd5f82e33975ddd
Parents: 72a558c
Author: Navina <[email protected]>
Authored: Tue Nov 24 15:51:05 2015 -0800
Committer: Navina <[email protected]>
Committed: Tue Nov 24 15:56:49 2015 -0800

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     | 20 ++++++++++++++++++++
 .../org/apache/samza/config/JobConfig.scala     | 11 +++++------
 2 files changed, 25 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/48220501/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html 
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 96fdcc0..2e09cd1 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -163,6 +163,26 @@
                         <strong>Required:</strong> The <span 
class="system">system-name</span> to use for creating and maintaining the <a 
href="../container/coordinator-stream.html">Coordinator Stream</a>.
                     </td>
                 </tr>
+
+                <tr>
+                    <td class="property" 
id="job-coordinator-replication-factor">job.coordinator.<br 
/>replication.factor</td>
+                    <td class="default">3</td>
+                    <td class="description">
+                        If you are using Kafka for coordinator stream, this is 
the number of Kafka nodes to which you want the
+                        coordinator topic replicated for durability.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" 
id="job-coordinator-segment-bytes">job.coordinator.<br />segment.bytes</td>
+                    <td class="default">26214400</td>
+                    <td class="description">
+                        If you are using a Kafka system for coordinator 
stream, this is the segment size to be used for the coordinator
+                        topic's log segments. Keeping this number small is 
useful because it increases the frequency
+                        that Kafka will garbage collect old messages.
+                    </td>
+                </tr>
+
                 <tr>
                     <td class="property" 
id="job-config-rewriter-class">job.config.rewriter.<br><span 
class="rewriter">rewriter-name</span>.class</td>
                     <td class="default"></td>

http://git-wip-us.apache.org/repos/asf/samza/blob/48220501/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 85a1ca4..1a8adae 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -79,14 +79,14 @@ class JobConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
   def getSystemStreamPartitionGrouperFactory = 
getOption(JobConfig.SSP_GROUPER_FACTORY).getOrElse(classOf[GroupByPartitionFactory].getCanonicalName)
 
   val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
+  val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
 
   def getCoordinatorReplicationFactor = 
getOption(JobConfig.JOB_REPLICATION_FACTOR) match {
     case Some(rplFactor) => rplFactor
     case _ =>
-      // TODO get rid of checkpoint configs in a future release
-      getOption("task.checkpoint.replication.factor") match {
+      getOption(CHECKPOINT_REPLICATION_FACTOR) match {
         case Some(rplFactor) =>
-          warn("Configuration 'task.checkpoint.replication.factor' is 
deprecated. Please use %s." format JobConfig.JOB_REPLICATION_FACTOR)
+          info("%s was not found. Using %s=%s for coordinator stream" format 
(JobConfig.JOB_REPLICATION_FACTOR, CHECKPOINT_REPLICATION_FACTOR, rplFactor))
           rplFactor
         case _ => "3"
       }
@@ -95,10 +95,9 @@ class JobConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
   def getCoordinatorSegmentBytes = getOption(JobConfig.JOB_SEGMENT_BYTES) 
match {
     case Some(segBytes) => segBytes
     case _ =>
-      // TODO get rid of checkpoint configs in a future release
-      getOption("task.checkpoint.segment.bytes") match {
+      getOption(CHECKPOINT_SEGMENT_BYTES) match {
         case Some(segBytes) =>
-          warn("Configuration 'task.checkpoint.segment.bytes' is deprecated. 
Please use %s." format JobConfig.JOB_SEGMENT_BYTES)
+          info("%s was not found. Using %s=%s for coordinator stream" format 
(JobConfig.JOB_SEGMENT_BYTES, CHECKPOINT_SEGMENT_BYTES, segBytes))
           segBytes
         case _ => "26214400"
       }

Reply via email to