SAMZA-820 - Separate checkpoint stream and coordinator stream configurations and update documentation
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/48220501 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/48220501 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/48220501 Branch: refs/heads/samza-sql Commit: 4822050134e00ab9a48f64395bd5f82e33975ddd Parents: 72a558c Author: Navina <[email protected]> Authored: Tue Nov 24 15:51:05 2015 -0800 Committer: Navina <[email protected]> Committed: Tue Nov 24 15:56:49 2015 -0800 ---------------------------------------------------------------------- .../versioned/jobs/configuration-table.html | 20 ++++++++++++++++++++ .../org/apache/samza/config/JobConfig.scala | 11 +++++------ 2 files changed, 25 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/48220501/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 96fdcc0..2e09cd1 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -163,6 +163,26 @@ <strong>Required:</strong> The <span class="system">system-name</span> to use for creating and maintaining the <a href="../container/coordinator-stream.html">Coordinator Stream</a>. </td> </tr> + + <tr> + <td class="property" id="job-coordinator-replication-factor">job.coordinator.<br />replication.factor</td> + <td class="default">3</td> + <td class="description"> + If you are using Kafka for coordinator stream, this is the number of Kafka nodes to which you want the + coordinator topic replicated for durability. + </td> + </tr> + + <tr> + <td class="property" id="job-coordinator-segment-bytes">job.coordinator.<br />segment.bytes</td> + <td class="default">26214400</td> + <td class="description"> + If you are using a Kafka system for coordinator stream, this is the segment size to be used for the coordinator + topic's log segments. Keeping this number small is useful because it increases the frequency + that Kafka will garbage collect old messages. + </td> + </tr> + <tr> <td class="property" id="job-config-rewriter-class">job.config.rewriter.<br><span class="rewriter">rewriter-name</span>.class</td> <td class="default"></td> http://git-wip-us.apache.org/repos/asf/samza/blob/48220501/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index 85a1ca4..1a8adae 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -79,14 +79,14 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getSystemStreamPartitionGrouperFactory = getOption(JobConfig.SSP_GROUPER_FACTORY).getOrElse(classOf[GroupByPartitionFactory].getCanonicalName) val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes" + val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor" def getCoordinatorReplicationFactor = getOption(JobConfig.JOB_REPLICATION_FACTOR) match { case Some(rplFactor) => rplFactor case _ => - // TODO get rid of checkpoint configs in a future release - getOption("task.checkpoint.replication.factor") match { + getOption(CHECKPOINT_REPLICATION_FACTOR) match { case Some(rplFactor) => - warn("Configuration 'task.checkpoint.replication.factor' is deprecated. Please use %s." format JobConfig.JOB_REPLICATION_FACTOR) + info("%s was not found. Using %s=%s for coordinator stream" format (JobConfig.JOB_REPLICATION_FACTOR, CHECKPOINT_REPLICATION_FACTOR, rplFactor)) rplFactor case _ => "3" } @@ -95,10 +95,9 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getCoordinatorSegmentBytes = getOption(JobConfig.JOB_SEGMENT_BYTES) match { case Some(segBytes) => segBytes case _ => - // TODO get rid of checkpoint configs in a future release - getOption("task.checkpoint.segment.bytes") match { + getOption(CHECKPOINT_SEGMENT_BYTES) match { case Some(segBytes) => - warn("Configuration 'task.checkpoint.segment.bytes' is deprecated. Please use %s." format JobConfig.JOB_SEGMENT_BYTES) + info("%s was not found. Using %s=%s for coordinator stream" format (JobConfig.JOB_SEGMENT_BYTES, CHECKPOINT_SEGMENT_BYTES, segBytes)) segBytes case _ => "26214400" }
