Repository: kafka Updated Branches: refs/heads/trunk 81f0c1e8f -> d9479275f
MINOR: Improve streams config parameters Adjust "importance level" and add explanation to the docs. Author: Matthias J. Sax <[email protected]> Reviewers: Eno Thereska <[email protected]>, Guozhang Wang <[email protected]> Closes #2855 from mjsax/minor-improve-streams-config-parameters Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9479275 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9479275 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9479275 Branch: refs/heads/trunk Commit: d9479275fdacd172a2b545007b55e787c53fe54e Parents: 81f0c1e Author: Matthias J. Sax <[email protected]> Authored: Wed May 31 14:53:28 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed May 31 14:53:28 2017 -0700 ---------------------------------------------------------------------- docs/streams.html | 6 + .../org/apache/kafka/streams/StreamsConfig.java | 314 ++++++++++--------- 2 files changed, 167 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d9479275/docs/streams.html ---------------------------------------------------------------------- diff --git a/docs/streams.html b/docs/streams.html index fe0e84e..b70074a 100644 --- a/docs/streams.html +++ b/docs/streams.html @@ -664,7 +664,13 @@ KStream<String, String> materialized = joined.through("topic4"); Besides defining the topology, developers will also need to configure their applications in <code>StreamsConfig</code> before running it. A complete list of Kafka Streams configs can be found <a href="/{{version}}/documentation/#streamsconfigs"><b>here</b></a>. + Note, that different parameters do have different "levels of importance", with the following interpretation: </p> + <ul> + <li> HIGH: you would most likely change the default value if you go to production </li> + <li> MEDIUM: default value might be ok, but you should double-check it </li> + <li> LOW: default value is most likely ok; only consider to change it if you hit an issues when running in production </li> + </ul> <p> Specifying the configuration in Kafka Streams is similar to the Kafka Producer and Consumer clients. Typically, you create a <code>java.util.Properties</code> instance, http://git-wip-us.apache.org/repos/asf/kafka/blob/d9479275/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 4238490..582aa9b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -145,7 +145,6 @@ public class StreamsConfig extends AbstractConfig { /** {@code connections.max.idle.ms} */ public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; - private static final String CONNECTIONS_MAX_IDLE_MS_DOC = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC; /** {@code default key.serde} */ public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde"; @@ -159,15 +158,16 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde"; private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the <code>Serde</code> interface."; - /** {@code key.serde} */ + /** + * {@code key.serde} + * @deprecated Use {@link #DEFAULT_KEY_SERDE_CLASS_CONFIG} instead. + */ @Deprecated public static final String KEY_SERDE_CLASS_CONFIG = "key.serde"; - @Deprecated - private static final String KEY_SERDE_CLASS_DOC = "Serializer / deserializer class for key that implements the <code>Serde</code> interface. This config is deprecated, use \"default.key.serde\" instead"; + private static final String KEY_SERDE_CLASS_DOC = "Serializer / deserializer class for key that implements the <code>Serde</code> interface. This config is deprecated, use <code>" + DEFAULT_KEY_SERDE_CLASS_CONFIG + "</code> instead"; /** {@code metadata.max.age.ms} */ public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG; - private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC; /** {@code metrics.num.samples} */ public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG; @@ -203,15 +203,12 @@ public class StreamsConfig extends AbstractConfig { /** {@code receive.buffer.bytes} */ public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG; - private static final String RECEIVE_BUFFER_DOC = CommonClientConfigs.RECEIVE_BUFFER_DOC; /** {@code reconnect.backoff.ms} */ public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG; - private static final String RECONNECT_BACKOFF_MS_DOC = CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC; /** {@code reconnect.backoff.max} */ public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG; - private static final String RECONNECT_BACKOFF_MAX_MS_DOC = CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC; /** {@code replication.factor} */ public static final String REPLICATION_FACTOR_CONFIG = "replication.factor"; @@ -219,11 +216,9 @@ public class StreamsConfig extends AbstractConfig { /** {@code request.timeout.ms} */ public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; - private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; /** {@code retry.backoff.ms} */ public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG; - private static final String RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC; /** {@code rocksdb.config.setter} */ public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG = "rocksdb.config.setter"; @@ -231,12 +226,9 @@ public class StreamsConfig extends AbstractConfig { /** {@code security.protocol} */ public static final String SECURITY_PROTOCOL_CONFIG = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; - private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC; - public static final String DEFAULT_SECURITY_PROTOCOL = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL; /** {@code send.buffer.bytes} */ public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG; - private static final String SEND_BUFFER_DOC = CommonClientConfigs.SEND_BUFFER_DOC; /** {@code state.cleanup.delay} */ public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms"; @@ -246,17 +238,21 @@ public class StreamsConfig extends AbstractConfig { public static final String STATE_DIR_CONFIG = "state.dir"; private static final String STATE_DIR_DOC = "Directory location for state store."; - /** {@code timestamp.extractor} */ + /** + * {@code timestamp.extractor} + * @deprecated Use {@link #DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG} instead. + */ @Deprecated public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor"; - @Deprecated - private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface. This config is deprecated, use \"default.timestamp.extractor\" instead"; + private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface. This config is deprecated, use <code>" + DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + "</code> instead"; - /** {@code value.serde} */ + /** + * {@code value.serde} + * @deprecated Use {@link #DEFAULT_VALUE_SERDE_CLASS_CONFIG} instead. + */ @Deprecated public static final String VALUE_SERDE_CLASS_CONFIG = "value.serde"; - @Deprecated - private static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the <code>Serde</code> interface. This config is deprecated, use \"default.value.serde\" instead"; + private static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the <code>Serde</code> interface. This config is deprecated, use <code>" + DEFAULT_VALUE_SERDE_CLASS_CONFIG + "</code> instead"; /** {@code windowstore.changelog.additional.retention.ms} */ public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms"; @@ -272,6 +268,9 @@ public class StreamsConfig extends AbstractConfig { static { CONFIG = new ConfigDef() + + // HIGH + .define(APPLICATION_ID_CONFIG, // required with no default value Type.STRING, Importance.HIGH, @@ -280,193 +279,202 @@ public class StreamsConfig extends AbstractConfig { Type.LIST, Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC) - .define(CLIENT_ID_CONFIG, - Type.STRING, - "", - Importance.HIGH, - CommonClientConfigs.CLIENT_ID_DOC) - .define(ZOOKEEPER_CONNECT_CONFIG, - Type.STRING, - "", - Importance.HIGH, - ZOOKEEPER_CONNECT_DOC) - .define(STATE_DIR_CONFIG, - Type.STRING, - "/tmp/kafka-streams", - Importance.MEDIUM, - STATE_DIR_DOC) .define(REPLICATION_FACTOR_CONFIG, Type.INT, 1, Importance.HIGH, REPLICATION_FACTOR_DOC) - .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + .define(STATE_DIR_CONFIG, + Type.STRING, + "/tmp/kafka-streams", + Importance.HIGH, + STATE_DIR_DOC) + + // MEDIUM + + .define(CACHE_MAX_BYTES_BUFFERING_CONFIG, + Type.LONG, + 10 * 1024 * 1024L, + atLeast(0), + Importance.MEDIUM, + CACHE_MAX_BYTES_BUFFERING_DOC) + .define(CLIENT_ID_CONFIG, + Type.STRING, + "", + Importance.MEDIUM, + CommonClientConfigs.CLIENT_ID_DOC) + .define(DEFAULT_KEY_SERDE_CLASS_CONFIG, Type.CLASS, - null, + Serdes.ByteArraySerde.class.getName(), Importance.MEDIUM, - TIMESTAMP_EXTRACTOR_CLASS_DOC) + DEFAULT_KEY_SERDE_CLASS_DOC) .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, - Type.CLASS, - FailOnInvalidTimestamp.class.getName(), - Importance.MEDIUM, - DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC) - .define(PARTITION_GROUPER_CLASS_CONFIG, Type.CLASS, - DefaultPartitionGrouper.class.getName(), + FailOnInvalidTimestamp.class.getName(), Importance.MEDIUM, - PARTITION_GROUPER_CLASS_DOC) - .define(KEY_SERDE_CLASS_CONFIG, - Type.CLASS, - null, - Importance.MEDIUM, - KEY_SERDE_CLASS_DOC) - .define(DEFAULT_KEY_SERDE_CLASS_CONFIG, - Type.CLASS, - Serdes.ByteArraySerde.class.getName(), - Importance.MEDIUM, - DEFAULT_KEY_SERDE_CLASS_DOC) - .define(VALUE_SERDE_CLASS_CONFIG, + DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC) + .define(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Type.CLASS, - null, + Serdes.ByteArraySerde.class.getName(), Importance.MEDIUM, - VALUE_SERDE_CLASS_DOC) - .define(DEFAULT_VALUE_SERDE_CLASS_CONFIG, - Type.CLASS, - Serdes.ByteArraySerde.class.getName(), - Importance.MEDIUM, - DEFAULT_VALUE_SERDE_CLASS_DOC) - .define(COMMIT_INTERVAL_MS_CONFIG, - Type.LONG, - DEFAULT_COMMIT_INTERVAL_MS, - Importance.LOW, - COMMIT_INTERVAL_MS_DOC) - .define(POLL_MS_CONFIG, - Type.LONG, - 100, - Importance.LOW, - POLL_MS_DOC) + DEFAULT_VALUE_SERDE_CLASS_DOC) + .define(NUM_STANDBY_REPLICAS_CONFIG, + Type.INT, + 0, + Importance.MEDIUM, + NUM_STANDBY_REPLICAS_DOC) .define(NUM_STREAM_THREADS_CONFIG, Type.INT, 1, - Importance.LOW, + Importance.MEDIUM, NUM_STREAM_THREADS_DOC) - .define(NUM_STANDBY_REPLICAS_CONFIG, - Type.INT, - 0, + .define(PROCESSING_GUARANTEE_CONFIG, + Type.STRING, + AT_LEAST_ONCE, + in(AT_LEAST_ONCE, EXACTLY_ONCE), + Importance.MEDIUM, + PROCESSING_GUARANTEE_DOC) + .define(SECURITY_PROTOCOL_CONFIG, + Type.STRING, + CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, + Importance.MEDIUM, + CommonClientConfigs.SECURITY_PROTOCOL_DOC) + + // LOW + + .define(APPLICATION_SERVER_CONFIG, + Type.STRING, + "", Importance.LOW, - NUM_STANDBY_REPLICAS_DOC) + APPLICATION_SERVER_DOC) .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG, Type.INT, 1000, Importance.LOW, BUFFERED_RECORDS_PER_PARTITION_DOC) - .define(STATE_CLEANUP_DELAY_MS_CONFIG, + .define(COMMIT_INTERVAL_MS_CONFIG, Type.LONG, - 10 * 60 * 1000, - Importance.LOW, - STATE_CLEANUP_DELAY_MS_DOC) - .define(METRIC_REPORTER_CLASSES_CONFIG, - Type.LIST, - "", + DEFAULT_COMMIT_INTERVAL_MS, Importance.LOW, - CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) - .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, - Type.LONG, - 30000, + COMMIT_INTERVAL_MS_DOC) + .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, + ConfigDef.Type.LONG, + 9 * 60 * 1000, + ConfigDef.Importance.LOW, + CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC) + .define(METADATA_MAX_AGE_CONFIG, + ConfigDef.Type.LONG, + 5 * 60 * 1000, atLeast(0), - Importance.LOW, - CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC) + ConfigDef.Importance.LOW, + CommonClientConfigs.METADATA_MAX_AGE_DOC) .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, CommonClientConfigs.METRICS_NUM_SAMPLES_DOC) + .define(METRIC_REPORTER_CLASSES_CONFIG, + Type.LIST, + "", + Importance.LOW, + CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) .define(METRICS_RECORDING_LEVEL_CONFIG, Type.STRING, Sensor.RecordingLevel.INFO.toString(), in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()), Importance.LOW, CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC) - .define(APPLICATION_SERVER_CONFIG, - Type.STRING, - "", + .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, + Type.LONG, + 30000, + atLeast(0), Importance.LOW, - APPLICATION_SERVER_DOC) - .define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, + CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC) + .define(PARTITION_GROUPER_CLASS_CONFIG, Type.CLASS, - null, + DefaultPartitionGrouper.class.getName(), Importance.LOW, - ROCKSDB_CONFIG_SETTER_CLASS_DOC) - .define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, - Type.LONG, - 24 * 60 * 60 * 1000, - Importance.MEDIUM, - WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC) - .define(CACHE_MAX_BYTES_BUFFERING_CONFIG, + PARTITION_GROUPER_CLASS_DOC) + .define(POLL_MS_CONFIG, Type.LONG, - 10 * 1024 * 1024L, - atLeast(0), + 100, Importance.LOW, - CACHE_MAX_BYTES_BUFFERING_DOC) - .define(SECURITY_PROTOCOL_CONFIG, - Type.STRING, - DEFAULT_SECURITY_PROTOCOL, - Importance.MEDIUM, - SECURITY_PROTOCOL_DOC) - .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, - ConfigDef.Type.LONG, - 9 * 60 * 1000, - ConfigDef.Importance.MEDIUM, - CONNECTIONS_MAX_IDLE_MS_DOC) - .define(RETRY_BACKOFF_MS_CONFIG, - ConfigDef.Type.LONG, - 100L, - atLeast(0L), - ConfigDef.Importance.LOW, - RETRY_BACKOFF_MS_DOC) - .define(METADATA_MAX_AGE_CONFIG, - ConfigDef.Type.LONG, - 5 * 60 * 1000, + POLL_MS_DOC) + .define(RECEIVE_BUFFER_CONFIG, + Type.INT, + 32 * 1024, atLeast(0), - ConfigDef.Importance.LOW, - METADATA_MAX_AGE_DOC) + Importance.LOW, + CommonClientConfigs.RECEIVE_BUFFER_DOC) .define(RECONNECT_BACKOFF_MS_CONFIG, - ConfigDef.Type.LONG, + Type.LONG, 50L, atLeast(0L), - ConfigDef.Importance.LOW, - RECONNECT_BACKOFF_MS_DOC) + Importance.LOW, + CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC) .define(RECONNECT_BACKOFF_MAX_MS_CONFIG, - ConfigDef.Type.LONG, + Type.LONG, 50L, atLeast(0L), ConfigDef.Importance.LOW, - RECONNECT_BACKOFF_MAX_MS_DOC) - .define(SEND_BUFFER_CONFIG, - ConfigDef.Type.INT, - 128 * 1024, - atLeast(0), - ConfigDef.Importance.MEDIUM, - SEND_BUFFER_DOC) - .define(RECEIVE_BUFFER_CONFIG, - ConfigDef.Type.INT, - 32 * 1024, - atLeast(0), - ConfigDef.Importance.MEDIUM, - RECEIVE_BUFFER_DOC) + CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC) + .define(RETRY_BACKOFF_MS_CONFIG, + Type.LONG, + 100L, + atLeast(0L), + ConfigDef.Importance.LOW, + CommonClientConfigs.RETRY_BACKOFF_MS_DOC) .define(REQUEST_TIMEOUT_MS_CONFIG, - ConfigDef.Type.INT, + Type.INT, 40 * 1000, atLeast(0), - ConfigDef.Importance.MEDIUM, - REQUEST_TIMEOUT_MS_DOC) - .define(PROCESSING_GUARANTEE_CONFIG, - ConfigDef.Type.STRING, - AT_LEAST_ONCE, - in(AT_LEAST_ONCE, EXACTLY_ONCE), - Importance.MEDIUM, - PROCESSING_GUARANTEE_DOC); + ConfigDef.Importance.LOW, + CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC) + .define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, + Type.CLASS, + null, + Importance.LOW, + ROCKSDB_CONFIG_SETTER_CLASS_DOC) + .define(SEND_BUFFER_CONFIG, + Type.INT, + 128 * 1024, + atLeast(0), + Importance.LOW, + CommonClientConfigs.SEND_BUFFER_DOC) + .define(STATE_CLEANUP_DELAY_MS_CONFIG, + Type.LONG, + 10 * 60 * 1000, + Importance.LOW, + STATE_CLEANUP_DELAY_MS_DOC) + .define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, + Type.LONG, + 24 * 60 * 60 * 1000, + Importance.LOW, + WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC) + + // @deprecated + + .define(KEY_SERDE_CLASS_CONFIG, + Type.CLASS, + null, + Importance.LOW, + KEY_SERDE_CLASS_DOC) + .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + Type.CLASS, + null, + Importance.LOW, + TIMESTAMP_EXTRACTOR_CLASS_DOC) + .define(VALUE_SERDE_CLASS_CONFIG, + Type.CLASS, + null, + Importance.LOW, + VALUE_SERDE_CLASS_DOC) + .define(ZOOKEEPER_CONNECT_CONFIG, + Type.STRING, + "", + Importance.LOW, + ZOOKEEPER_CONNECT_DOC); } // this is the list of configs for underlying clients
