This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b822206 KAFKA-6138 Simplify StreamsBuilder#addGlobalStore (#4430)
b822206 is described below
commit b8222065e0cc09f455dc5b2aa43c1d68cf0d4a1f
Author: Panuwat Anawatmongkhon <[email protected]>
AuthorDate: Thu Feb 1 04:53:15 2018 +0700
KAFKA-6138 Simplify StreamsBuilder#addGlobalStore (#4430)
- implements KIP-233
Author: Panuwat Anawatmongkhon <[email protected]>
Reviewers: Bill Bejeck <[email protected]>, Damian Guy
<[email protected]>, Matthias J. Sax <[email protected]>
---
.../org/apache/kafka/streams/StreamsBuilder.java | 34 ++++++++++++++++------
.../kstream/internals/InternalStreamsBuilder.java | 16 ++++++++++
2 files changed, 41 insertions(+), 9 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 27105c6..f08098e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -454,6 +454,28 @@ public class StreamsBuilder {
}
/**
+ * @deprecated use {@link #addGlobalStore(StoreBuilder, String, Consumed,
ProcessorSupplier)} instead
+ */
+ @SuppressWarnings("unchecked")
+ @Deprecated
+ public synchronized StreamsBuilder addGlobalStore(final StoreBuilder
storeBuilder,
+ final String topic,
+ final String sourceName,
+ final Consumed consumed,
+ final String
processorName,
+ final ProcessorSupplier
stateUpdateSupplier) {
+ Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
+ Objects.requireNonNull(consumed, "consumed can't be null");
+ internalStreamsBuilder.addGlobalStore(storeBuilder,
+ sourceName,
+ topic,
+ new ConsumedInternal<>(consumed),
+ processorName,
+ stateUpdateSupplier);
+ return this;
+ }
+
+ /**
* Adds a global {@link StateStore} to the topology.
* The {@link StateStore} sources its data from all partitions of the
provided input topic.
* There will be exactly one instance of this {@link StateStore} per Kafka
Streams instance.
@@ -467,10 +489,8 @@ public class StreamsBuilder {
* The default {@link TimestampExtractor} as specified in the {@link
StreamsConfig config} is used.
*
* @param storeBuilder user defined {@link StoreBuilder}; can't
be {@code null}
- * @param sourceName name of the {@link SourceNode} that will
be automatically added
* @param topic the topic to source the data from
* @param consumed the instance of {@link Consumed} used to
define optional parameters; can't be {@code null}
- * @param processorName the name of the {@link ProcessorSupplier}
* @param stateUpdateSupplier the instance of {@link ProcessorSupplier}
* @return itself
* @throws TopologyException if the processor of state is already
registered
@@ -478,18 +498,14 @@ public class StreamsBuilder {
@SuppressWarnings("unchecked")
public synchronized StreamsBuilder addGlobalStore(final StoreBuilder
storeBuilder,
final String topic,
- final String sourceName,
final Consumed consumed,
- final String
processorName,
final ProcessorSupplier
stateUpdateSupplier) {
Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
internalStreamsBuilder.addGlobalStore(storeBuilder,
- sourceName,
- topic,
- new ConsumedInternal<>(consumed),
- processorName,
- stateUpdateSupplier);
+ topic,
+ new ConsumedInternal<>(consumed),
+ stateUpdateSupplier);
return this;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 2a8a89e..0b028e6 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -194,4 +194,20 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
processorName,
stateUpdateSupplier);
}
+
+ public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore>
storeBuilder,
+ final String topic,
+ final ConsumedInternal consumed,
+ final ProcessorSupplier
stateUpdateSupplier) {
+ // explicitly disable logging for global stores
+ storeBuilder.withLoggingDisabled();
+ final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME);
+ final String processorName = newProcessorName(KTableImpl.SOURCE_NAME);
+ addGlobalStore(storeBuilder,
+ sourceName,
+ topic,
+ consumed,
+ processorName,
+ stateUpdateSupplier);
+ }
}
--
To stop receiving notification emails like this one, please contact
[email protected].