Dmytro, We can discuss your questions on the jira.
Thanks, Jun On Wed, Nov 5, 2014 at 8:18 AM, Dmytro Kostiuchenko < dmytro.kostiuche...@gmail.com> wrote: > Hi, I decided to fix KAFKA-1667. > > Currently I have an initial patch, which seems to work. I would like > to know, whether overall code is ok. Also there are few TODOs in the > code > > 1. I haven't added documentation to the properties, as ConfigDef > suggests. Should I? > 2. I'm not sure what Importance should be assigned to properties. It > is NORMAL for all properties. Where can I find some info on this? > 3. Not totally sure, that validations are correct. Tried to figure > that out from the code, still might miss something. > > Finally is this mailing list is the right place to ask such questions > or should I submit patch to Jira ticket and get a review there even if > I'm not sure about its quality? > > Thanks for the help. > > The patch itself: > > diff --git > a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java > b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java > index c4cea2c..347e252 100644 > --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java > +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java > @@ -19,6 +19,7 @@ import java.util.Comparator; > import java.util.HashMap; > import java.util.List; > import java.util.Map; > +import java.util.Set; > > /** > * This class is used for specifying the set of expected > configurations, their type, their defaults, their > @@ -49,6 +50,14 @@ public class ConfigDef { > private final Map<String, ConfigKey> configKeys = new > HashMap<String, ConfigKey>(); > > /** > + * Returns unmodifiable set of properties names defined in this > {@linkplain ConfigDef} > + * @return new unmodifiable {@link Set} instance containing the keys > + */ > + public Set<String> names() { > + return Collections.unmodifiableSet(configKeys.keySet()); > + } > + > + /** > * Define a new configuration > * @param name The name of the config parameter > * @param type The type of the config > diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala > b/core/src/main/scala/kafka/admin/TopicCommand.scala > index 0b2735e..285c033 100644 > --- a/core/src/main/scala/kafka/admin/TopicCommand.scala > +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala > @@ -256,7 +256,7 @@ object TopicCommand { > .ofType(classOf[String]) > val nl = System.getProperty("line.separator") > val configOpt = parser.accepts("config", "A topic configuration > override for the topic being created or altered." + > - "The > following is a list of valid configurations: " + nl + > LogConfig.ConfigNames.map("\t" + _).mkString(nl) + nl + > + "The > following is a list of valid configurations: " + nl + > LogConfig.configNames.map("\t" + _).mkString(nl) + nl + > "See the > Kafka documentation for full details on the topic configs.") > .withRequiredArg > .describedAs("name=value") > diff --git a/core/src/main/scala/kafka/log/LogConfig.scala > b/core/src/main/scala/kafka/log/LogConfig.scala > index e48922a..3e0a986 100644 > --- a/core/src/main/scala/kafka/log/LogConfig.scala > +++ b/core/src/main/scala/kafka/log/LogConfig.scala > @@ -21,7 +21,7 @@ import java.util.Properties > import org.apache.kafka.common.utils.Utils > > import scala.collection._ > -import kafka.common._ > +import org.apache.kafka.common.config.ConfigDef > > object Defaults { > val SegmentSize = 1024 * 1024 > @@ -106,6 +106,10 @@ case class LogConfig(val segmentSize: Int = > Defaults.SegmentSize, > } > > object LogConfig { > + > + val Delete = "delete" > + val Compact = "compact" > + > val SegmentBytesProp = "segment.bytes" > val SegmentMsProp = "segment.ms" > val SegmentJitterMsProp = "segment.jitter.ms" > @@ -123,46 +127,61 @@ object LogConfig { > val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable" > val MinInSyncReplicasProp = "min.insync.replicas" > > - val ConfigNames = Set(SegmentBytesProp, > - SegmentMsProp, > - SegmentJitterMsProp, > - SegmentIndexBytesProp, > - FlushMessagesProp, > - FlushMsProp, > - RetentionBytesProp, > - RententionMsProp, > - MaxMessageBytesProp, > - IndexIntervalBytesProp, > - FileDeleteDelayMsProp, > - DeleteRetentionMsProp, > - MinCleanableDirtyRatioProp, > - CleanupPolicyProp, > - UncleanLeaderElectionEnableProp, > - MinInSyncReplicasProp) > + private val configDef = { > + import ConfigDef.Range._ > + import ConfigDef.ValidString._ > + import ConfigDef.Type._ > + import ConfigDef.Importance._ > + import java.util.Arrays.asList > + > + // TODO clarify importance > + // TODO clarify validations > + // TODO define documentation > + new ConfigDef() > + .define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(0), > MEDIUM, "") > + .define(SegmentMsProp, LONG, Defaults.SegmentMs, atLeast(0), MEDIUM, > "") > + .define(SegmentJitterMsProp, LONG, Defaults.SegmentJitterMs, > atLeast(0), MEDIUM, "") > + .define(SegmentIndexBytesProp, INT, Defaults.MaxIndexSize, > atLeast(0), MEDIUM, "") > + .define(FlushMessagesProp, LONG, Defaults.FlushInterval, > atLeast(0), MEDIUM, "") > + .define(FlushMsProp, LONG, Defaults.FlushMs, atLeast(0), MEDIUM, "") > + .define(RetentionBytesProp, LONG, Defaults.RetentionSize, > atLeast(0), MEDIUM, "") > + .define(RententionMsProp, LONG, Defaults.RetentionMs, atLeast(0), > MEDIUM, "") > + .define(MaxMessageBytesProp, INT, Defaults.MaxMessageSize, > atLeast(0), MEDIUM, "") > + .define(IndexIntervalBytesProp, INT, Defaults.IndexInterval, > atLeast(0), MEDIUM, "") > + .define(DeleteRetentionMsProp, LONG, Defaults.DeleteRetentionMs, > atLeast(0), MEDIUM, "") > + .define(FileDeleteDelayMsProp, LONG, Defaults.FileDeleteDelayMs, > atLeast(0), MEDIUM, "") > + .define(MinCleanableDirtyRatioProp, DOUBLE, > Defaults.MinCleanableDirtyRatio, atLeast(0), MEDIUM, "") > + .define(CleanupPolicyProp, STRING, if (Defaults.Compact) Delete > else Compact, in(asList(Compact, Delete)), MEDIUM, "") > + .define(UncleanLeaderElectionEnableProp, BOOLEAN, > Defaults.UncleanLeaderElectionEnable, MEDIUM, "") > + .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, > atLeast(1), MEDIUM, "") > + } > + > + def configNames() = { > + import JavaConversions._ > + configDef.names().toList.sorted > + } > > /** > * Parse the given properties instance into a LogConfig object > */ > def fromProps(props: Properties): LogConfig = { > - new LogConfig(segmentSize = props.getProperty(SegmentBytesProp, > Defaults.SegmentSize.toString).toInt, > - segmentMs = props.getProperty(SegmentMsProp, > Defaults.SegmentMs.toString).toLong, > - segmentJitterMs = > props.getProperty(SegmentJitterMsProp, > Defaults.SegmentJitterMs.toString).toLong, > - maxIndexSize = > props.getProperty(SegmentIndexBytesProp, > Defaults.MaxIndexSize.toString).toInt, > - flushInterval = > props.getProperty(FlushMessagesProp, > Defaults.FlushInterval.toString).toLong, > - flushMs = props.getProperty(FlushMsProp, > Defaults.FlushMs.toString).toLong, > - retentionSize = > props.getProperty(RetentionBytesProp, > Defaults.RetentionSize.toString).toLong, > - retentionMs = props.getProperty(RententionMsProp, > Defaults.RetentionMs.toString).toLong, > - maxMessageSize = > props.getProperty(MaxMessageBytesProp, > Defaults.MaxMessageSize.toString).toInt, > - indexInterval = > props.getProperty(IndexIntervalBytesProp, > Defaults.IndexInterval.toString).toInt, > - fileDeleteDelayMs = > props.getProperty(FileDeleteDelayMsProp, > Defaults.FileDeleteDelayMs.toString).toInt, > - deleteRetentionMs = > props.getProperty(DeleteRetentionMsProp, > Defaults.DeleteRetentionMs.toString).toLong, > - minCleanableRatio = > props.getProperty(MinCleanableDirtyRatioProp, > - Defaults.MinCleanableDirtyRatio.toString).toDouble, > - compact = props.getProperty(CleanupPolicyProp, > if(Defaults.Compact) "compact" else "delete") > - .trim.toLowerCase != "delete", > - uncleanLeaderElectionEnable = > props.getProperty(UncleanLeaderElectionEnableProp, > - > Defaults.UncleanLeaderElectionEnable.toString).toBoolean, > - minInSyncReplicas = > > props.getProperty(MinInSyncReplicasProp,Defaults.MinInSyncReplicas.toString).toInt) > + val parsed = configDef.parse(props) > + new LogConfig(segmentSize = > parsed.get(SegmentBytesProp).asInstanceOf[Int], > + segmentMs = > parsed.get(SegmentMsProp).asInstanceOf[Long], > + segmentJitterMs = > parsed.get(SegmentJitterMsProp).asInstanceOf[Long], > + maxIndexSize = > parsed.get(SegmentIndexBytesProp).asInstanceOf[Int], > + flushInterval = > parsed.get(FlushMessagesProp).asInstanceOf[Long], > + flushMs = parsed.get(FlushMsProp).asInstanceOf[Long], > + retentionSize = > parsed.get(RetentionBytesProp).asInstanceOf[Long], > + retentionMs = > parsed.get(RententionMsProp).asInstanceOf[Long], > + maxMessageSize = > parsed.get(MaxMessageBytesProp).asInstanceOf[Int], > + indexInterval = > parsed.get(IndexIntervalBytesProp).asInstanceOf[Int], > + fileDeleteDelayMs = > parsed.get(FileDeleteDelayMsProp).asInstanceOf[Int], > + deleteRetentionMs = > parsed.get(DeleteRetentionMsProp).asInstanceOf[Long], > + minCleanableRatio = > parsed.get(MinCleanableDirtyRatioProp).asInstanceOf[Double], > + compact = > parsed.get(CleanupPolicyProp).asInstanceOf[String].toLowerCase != > Delete, > + uncleanLeaderElectionEnable = > parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean], > + minInSyncReplicas = > parsed.get(MinInSyncReplicasProp).asInstanceOf[Int]) > } > > /** > @@ -179,30 +198,17 @@ object LogConfig { > */ > def validateNames(props: Properties) { > import JavaConversions._ > + val names = configDef.names() > for(name <- props.keys) > - require(LogConfig.ConfigNames.contains(name), "Unknown > configuration \"%s\".".format(name)) > + require(names.contains(name), "Unknown configuration > \"%s\".".format(name)) > } > > /** > - * Check that the given properties contain only valid log config > names, and that all values can be parsed. > + * Check that the given properties contain only valid log config > names and that all values can be parsed and are valid > */ > def validate(props: Properties) { > validateNames(props) > - validateMinInSyncReplicas(props) > - LogConfig.fromProps(LogConfig().toProps, props) // check that we > can parse the values > - } > - > - /** > - * Check that MinInSyncReplicas is reasonable > - * Unfortunately, we can't validate its smaller than number of replicas > - * since we don't have this information here > - */ > - private def validateMinInSyncReplicas(props: Properties) { > - val minIsr = props.getProperty(MinInSyncReplicasProp) > - if (minIsr != null && minIsr.toInt < 1) { > - throw new InvalidConfigException("Wrong value " + minIsr + " of > min.insync.replicas in topic configuration; " + > - " Valid values are at least 1") > - } > + configDef.parse(props) > } > > -} > \ No newline at end of file > +} >