Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/1808#discussion_r96730185
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
---
@@ -235,41 +355,116 @@ public Builder(Map<String, Object> kafkaProps,
KafkaSpoutStreams kafkaSpoutStrea
this.firstPollOffsetStrategy = firstPollOffsetStrategy;
return this;
}
-
+
/**
- * Sets partition refresh period in milliseconds in manual
partition assignment model. Default is 2s.
- * @param partitionRefreshPeriodMs time in milliseconds
+ * Sets the retry service for the spout to use.
+ * @param retryService the new retry service
+ * @return the builder (this).
*/
- public Builder<K, V> setPartitionRefreshPeriodMs(long
partitionRefreshPeriodMs) {
- this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
+ public Builder<K, V> setRetry(KafkaSpoutRetryService retryService)
{
+ if (retryService == null) {
+ throw new NullPointerException("retryService cannot be
null");
+ }
+ this.retryService = retryService;
return this;
}
+ public Builder<K, V> setRecordTranslator(RecordTranslator<K, V>
translator) {
+ this.translator = translator;
+ return this;
+ }
+
+ public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K,
V>, List<Object>> func, Fields fields) {
+ return setRecordTranslator(new SimpleRecordTranslator<>(func,
fields));
+ }
+
+ public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K,
V>, List<Object>> func, Fields fields, String stream) {
+ return setRecordTranslator(new SimpleRecordTranslator<>(func,
fields, stream));
+ }
+
/**
- * Defines whether the consumer manages partition manually.
- * If set to true, the consumer manage partition manually,
otherwise it will rely on kafka to do partition assignment.
- * @param manualPartitionAssignment True if using manual partition
assignment.
+ * Sets partition refresh period in milliseconds. This is how
often the subscription is refreshed
+ * For most subscriptions that go through the
KafkaConsumer.subscribe this is ignored.
+ * @param partitionRefreshPeriodMs time in milliseconds
+ * @return the builder (this)
*/
- public Builder<K, V> setManualPartitionAssignment(boolean
manualPartitionAssignment) {
- this.manualPartitionAssignment = manualPartitionAssignment;
- return this;
+ public Builder<K, V> setPartitionRefreshPeriodMs(long
partitionRefreshPeriodMs) {
+ this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
+ return this;
}
-
+
public KafkaSpoutConfig<K,V> build() {
return new KafkaSpoutConfig<>(this);
}
}
+ // Kafka consumer configuration
+ private final Map<String, Object> kafkaProps;
+ private final Subscription subscription;
+ private final SerializableDeserializer<K> keyDes;
+ private final Class<? extends Deserializer<K>> keyDesClazz;
+ private final SerializableDeserializer<V> valueDes;
+ private final Class<? extends Deserializer<V>> valueDesClazz;
+ private final long pollTimeoutMs;
+
+ // Kafka spout configuration
+ private final RecordTranslator<K, V> translator;
+ private final long offsetCommitPeriodMs;
+ private final int maxRetries;
+ private final int maxUncommittedOffsets;
+ private final FirstPollOffsetStrategy firstPollOffsetStrategy;
+ private final KafkaSpoutRetryService retryService;
+ private final long partitionRefreshPeriodMs;
+
+ private KafkaSpoutConfig(Builder<K,V> builder) {
+ this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
+ this.subscription = builder.subscription;
+ this.translator = builder.translator;
+ this.pollTimeoutMs = builder.pollTimeoutMs;
+ this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
+ this.maxRetries = builder.maxRetries;
+ this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
+ this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
+ this.retryService = builder.retryService;
+ this.keyDes = builder.keyDes;
+ this.keyDesClazz = builder.keyDesClazz;
+ this.valueDes = builder.valueDes;
+ this.valueDesClazz = builder.valueDesClazz;
+ this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
+ }
+
public Map<String, Object> getKafkaProps() {
return kafkaProps;
}
public Deserializer<K> getKeyDeserializer() {
- return keyDeserializer;
+ if (keyDesClazz != null) {
+ try {
--- End diff --
Tabs
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---