Dmytro,

We can discuss your questions on the jira.

Thanks,

Jun

On Wed, Nov 5, 2014 at 8:18 AM, Dmytro Kostiuchenko <
dmytro.kostiuche...@gmail.com> wrote:

> Hi, I decided to fix KAFKA-1667.
>
> Currently I have an initial patch, which seems to work. I would like
> to know, whether overall code is ok. Also there are few TODOs in the
> code
>
> 1. I haven't added documentation to the properties, as ConfigDef
> suggests. Should I?
> 2. I'm not sure what Importance should be assigned to properties. It
> is NORMAL for all properties. Where can I find some info on this?
> 3. Not totally sure, that validations are correct. Tried to figure
> that out from the code, still might miss something.
>
> Finally is this mailing list is the right place to ask such questions
> or should I submit patch to Jira ticket and get a review there even if
> I'm not sure about its quality?
>
> Thanks for the help.
>
> The patch itself:
>
> diff --git
> a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
> b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
> index c4cea2c..347e252 100644
> --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
> +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
> @@ -19,6 +19,7 @@ import java.util.Comparator;
>  import java.util.HashMap;
>  import java.util.List;
>  import java.util.Map;
> +import java.util.Set;
>
>  /**
>   * This class is used for specifying the set of expected
> configurations, their type, their defaults, their
> @@ -49,6 +50,14 @@ public class ConfigDef {
>      private final Map<String, ConfigKey> configKeys = new
> HashMap<String, ConfigKey>();
>
>      /**
> +     * Returns unmodifiable set of properties names defined in this
> {@linkplain ConfigDef}
> +     * @return new unmodifiable {@link Set} instance containing the keys
> +     */
> +    public Set<String> names() {
> +        return Collections.unmodifiableSet(configKeys.keySet());
> +    }
> +
> +    /**
>       * Define a new configuration
>       * @param name The name of the config parameter
>       * @param type The type of the config
> diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala
> b/core/src/main/scala/kafka/admin/TopicCommand.scala
> index 0b2735e..285c033 100644
> --- a/core/src/main/scala/kafka/admin/TopicCommand.scala
> +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
> @@ -256,7 +256,7 @@ object TopicCommand {
>                           .ofType(classOf[String])
>      val nl = System.getProperty("line.separator")
>      val configOpt = parser.accepts("config", "A topic configuration
> override for the topic being created or altered."  +
> -                                                         "The
> following is a list of valid configurations: " + nl +
> LogConfig.ConfigNames.map("\t" + _).mkString(nl) + nl +
> +                                                         "The
> following is a list of valid configurations: " + nl +
> LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
>                                                           "See the
> Kafka documentation for full details on the topic configs.")
>                            .withRequiredArg
>                            .describedAs("name=value")
> diff --git a/core/src/main/scala/kafka/log/LogConfig.scala
> b/core/src/main/scala/kafka/log/LogConfig.scala
> index e48922a..3e0a986 100644
> --- a/core/src/main/scala/kafka/log/LogConfig.scala
> +++ b/core/src/main/scala/kafka/log/LogConfig.scala
> @@ -21,7 +21,7 @@ import java.util.Properties
>  import org.apache.kafka.common.utils.Utils
>
>  import scala.collection._
> -import kafka.common._
> +import org.apache.kafka.common.config.ConfigDef
>
>  object Defaults {
>    val SegmentSize = 1024 * 1024
> @@ -106,6 +106,10 @@ case class LogConfig(val segmentSize: Int =
> Defaults.SegmentSize,
>  }
>
>  object LogConfig {
> +
> +  val Delete = "delete"
> +  val Compact = "compact"
> +
>    val SegmentBytesProp = "segment.bytes"
>    val SegmentMsProp = "segment.ms"
>    val SegmentJitterMsProp = "segment.jitter.ms"
> @@ -123,46 +127,61 @@ object LogConfig {
>    val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
>    val MinInSyncReplicasProp = "min.insync.replicas"
>
> -  val ConfigNames = Set(SegmentBytesProp,
> -                        SegmentMsProp,
> -                        SegmentJitterMsProp,
> -                        SegmentIndexBytesProp,
> -                        FlushMessagesProp,
> -                        FlushMsProp,
> -                        RetentionBytesProp,
> -                        RententionMsProp,
> -                        MaxMessageBytesProp,
> -                        IndexIntervalBytesProp,
> -                        FileDeleteDelayMsProp,
> -                        DeleteRetentionMsProp,
> -                        MinCleanableDirtyRatioProp,
> -                        CleanupPolicyProp,
> -                        UncleanLeaderElectionEnableProp,
> -                        MinInSyncReplicasProp)
> +  private val configDef = {
> +    import ConfigDef.Range._
> +    import ConfigDef.ValidString._
> +    import ConfigDef.Type._
> +    import ConfigDef.Importance._
> +    import java.util.Arrays.asList
> +
> +    // TODO clarify importance
> +    // TODO clarify validations
> +    // TODO define documentation
> +    new ConfigDef()
> +    .define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(0),
> MEDIUM, "")
> +    .define(SegmentMsProp, LONG, Defaults.SegmentMs, atLeast(0), MEDIUM,
> "")
> +    .define(SegmentJitterMsProp, LONG, Defaults.SegmentJitterMs,
> atLeast(0), MEDIUM, "")
> +    .define(SegmentIndexBytesProp, INT, Defaults.MaxIndexSize,
> atLeast(0), MEDIUM, "")
> +    .define(FlushMessagesProp, LONG, Defaults.FlushInterval,
> atLeast(0), MEDIUM, "")
> +    .define(FlushMsProp, LONG, Defaults.FlushMs, atLeast(0), MEDIUM, "")
> +    .define(RetentionBytesProp, LONG, Defaults.RetentionSize,
> atLeast(0), MEDIUM, "")
> +    .define(RententionMsProp, LONG, Defaults.RetentionMs, atLeast(0),
> MEDIUM, "")
> +    .define(MaxMessageBytesProp, INT, Defaults.MaxMessageSize,
> atLeast(0), MEDIUM, "")
> +    .define(IndexIntervalBytesProp, INT, Defaults.IndexInterval,
> atLeast(0), MEDIUM, "")
> +    .define(DeleteRetentionMsProp, LONG, Defaults.DeleteRetentionMs,
> atLeast(0), MEDIUM, "")
> +    .define(FileDeleteDelayMsProp, LONG, Defaults.FileDeleteDelayMs,
> atLeast(0), MEDIUM, "")
> +    .define(MinCleanableDirtyRatioProp, DOUBLE,
> Defaults.MinCleanableDirtyRatio, atLeast(0), MEDIUM, "")
> +    .define(CleanupPolicyProp, STRING, if (Defaults.Compact) Delete
> else Compact, in(asList(Compact, Delete)), MEDIUM, "")
> +    .define(UncleanLeaderElectionEnableProp, BOOLEAN,
> Defaults.UncleanLeaderElectionEnable, MEDIUM, "")
> +    .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas,
> atLeast(1), MEDIUM, "")
> +  }
> +
> +  def configNames() = {
> +    import JavaConversions._
> +    configDef.names().toList.sorted
> +  }
>
>    /**
>     * Parse the given properties instance into a LogConfig object
>     */
>    def fromProps(props: Properties): LogConfig = {
> -    new LogConfig(segmentSize = props.getProperty(SegmentBytesProp,
> Defaults.SegmentSize.toString).toInt,
> -                  segmentMs = props.getProperty(SegmentMsProp,
> Defaults.SegmentMs.toString).toLong,
> -                  segmentJitterMs =
> props.getProperty(SegmentJitterMsProp,
> Defaults.SegmentJitterMs.toString).toLong,
> -                  maxIndexSize =
> props.getProperty(SegmentIndexBytesProp,
> Defaults.MaxIndexSize.toString).toInt,
> -                  flushInterval =
> props.getProperty(FlushMessagesProp,
> Defaults.FlushInterval.toString).toLong,
> -                  flushMs = props.getProperty(FlushMsProp,
> Defaults.FlushMs.toString).toLong,
> -                  retentionSize =
> props.getProperty(RetentionBytesProp,
> Defaults.RetentionSize.toString).toLong,
> -                  retentionMs = props.getProperty(RententionMsProp,
> Defaults.RetentionMs.toString).toLong,
> -                  maxMessageSize =
> props.getProperty(MaxMessageBytesProp,
> Defaults.MaxMessageSize.toString).toInt,
> -                  indexInterval =
> props.getProperty(IndexIntervalBytesProp,
> Defaults.IndexInterval.toString).toInt,
> -                  fileDeleteDelayMs =
> props.getProperty(FileDeleteDelayMsProp,
> Defaults.FileDeleteDelayMs.toString).toInt,
> -                  deleteRetentionMs =
> props.getProperty(DeleteRetentionMsProp,
> Defaults.DeleteRetentionMs.toString).toLong,
> -                  minCleanableRatio =
> props.getProperty(MinCleanableDirtyRatioProp,
> -                    Defaults.MinCleanableDirtyRatio.toString).toDouble,
> -                  compact = props.getProperty(CleanupPolicyProp,
> if(Defaults.Compact) "compact" else "delete")
> -                    .trim.toLowerCase != "delete",
> -                  uncleanLeaderElectionEnable =
> props.getProperty(UncleanLeaderElectionEnableProp,
> -
> Defaults.UncleanLeaderElectionEnable.toString).toBoolean,
> -                  minInSyncReplicas =
>
> props.getProperty(MinInSyncReplicasProp,Defaults.MinInSyncReplicas.toString).toInt)
> +    val parsed = configDef.parse(props)
> +    new LogConfig(segmentSize =
> parsed.get(SegmentBytesProp).asInstanceOf[Int],
> +                  segmentMs =
> parsed.get(SegmentMsProp).asInstanceOf[Long],
> +                  segmentJitterMs =
> parsed.get(SegmentJitterMsProp).asInstanceOf[Long],
> +                  maxIndexSize =
> parsed.get(SegmentIndexBytesProp).asInstanceOf[Int],
> +                  flushInterval =
> parsed.get(FlushMessagesProp).asInstanceOf[Long],
> +                  flushMs = parsed.get(FlushMsProp).asInstanceOf[Long],
> +                  retentionSize =
> parsed.get(RetentionBytesProp).asInstanceOf[Long],
> +                  retentionMs =
> parsed.get(RententionMsProp).asInstanceOf[Long],
> +                  maxMessageSize =
> parsed.get(MaxMessageBytesProp).asInstanceOf[Int],
> +                  indexInterval =
> parsed.get(IndexIntervalBytesProp).asInstanceOf[Int],
> +                  fileDeleteDelayMs =
> parsed.get(FileDeleteDelayMsProp).asInstanceOf[Int],
> +                  deleteRetentionMs =
> parsed.get(DeleteRetentionMsProp).asInstanceOf[Long],
> +                  minCleanableRatio =
> parsed.get(MinCleanableDirtyRatioProp).asInstanceOf[Double],
> +                  compact =
> parsed.get(CleanupPolicyProp).asInstanceOf[String].toLowerCase !=
> Delete,
> +                  uncleanLeaderElectionEnable =
> parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean],
> +                  minInSyncReplicas =
> parsed.get(MinInSyncReplicasProp).asInstanceOf[Int])
>    }
>
>    /**
> @@ -179,30 +198,17 @@ object LogConfig {
>     */
>    def validateNames(props: Properties) {
>      import JavaConversions._
> +    val names = configDef.names()
>      for(name <- props.keys)
> -      require(LogConfig.ConfigNames.contains(name), "Unknown
> configuration \"%s\".".format(name))
> +      require(names.contains(name), "Unknown configuration
> \"%s\".".format(name))
>    }
>
>    /**
> -   * Check that the given properties contain only valid log config
> names, and that all values can be parsed.
> +   * Check that the given properties contain only valid log config
> names and that all values can be parsed and are valid
>     */
>    def validate(props: Properties) {
>      validateNames(props)
> -    validateMinInSyncReplicas(props)
> -    LogConfig.fromProps(LogConfig().toProps, props) // check that we
> can parse the values
> -  }
> -
> -  /**
> -   * Check that MinInSyncReplicas is reasonable
> -   * Unfortunately, we can't validate its smaller than number of replicas
> -   * since we don't have this information here
> -   */
> -  private def validateMinInSyncReplicas(props: Properties) {
> -    val minIsr = props.getProperty(MinInSyncReplicasProp)
> -    if (minIsr != null && minIsr.toInt < 1) {
> -      throw new InvalidConfigException("Wrong value " + minIsr + " of
> min.insync.replicas in topic configuration; " +
> -        " Valid values are at least 1")
> -    }
> +    configDef.parse(props)
>    }
>
> -}
> \ No newline at end of file
> +}
>

Reply via email to