[
https://issues.apache.org/jira/browse/KAFKA-6138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347672#comment-16347672
]
ASF GitHub Bot commented on KAFKA-6138:
---------------------------------------
mjsax closed pull request #4430: KAFKA-6138 Simplify
StreamsBuilder#addGlobalStore
URL: https://github.com/apache/kafka/pull/4430
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index a94b0a74622..6551deeaed2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -453,6 +453,28 @@ public synchronized StreamsBuilder addStateStore(final
StoreBuilder builder) {
return this;
}
+ /**
+ * @deprecated use {@link #addGlobalStore(StoreBuilder, String, Consumed,
ProcessorSupplier)} instead
+ */
+ @SuppressWarnings("unchecked")
+ @Deprecated
+ public synchronized StreamsBuilder addGlobalStore(final StoreBuilder
storeBuilder,
+ final String topic,
+ final String sourceName,
+ final Consumed consumed,
+ final String
processorName,
+ final ProcessorSupplier
stateUpdateSupplier) {
+ Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
+ Objects.requireNonNull(consumed, "consumed can't be null");
+ internalStreamsBuilder.addGlobalStore(storeBuilder,
+ sourceName,
+ topic,
+ new ConsumedInternal<>(consumed),
+ processorName,
+ stateUpdateSupplier);
+ return this;
+ }
+
/**
* Adds a global {@link StateStore} to the topology.
* The {@link StateStore} sources its data from all partitions of the
provided input topic.
@@ -467,10 +489,8 @@ public synchronized StreamsBuilder addStateStore(final
StoreBuilder builder) {
* The default {@link TimestampExtractor} as specified in the {@link
StreamsConfig config} is used.
*
* @param storeBuilder user defined {@link StoreBuilder}; can't
be {@code null}
- * @param sourceName name of the {@link SourceNode} that will
be automatically added
* @param topic the topic to source the data from
* @param consumed the instance of {@link Consumed} used to
define optional parameters; can't be {@code null}
- * @param processorName the name of the {@link ProcessorSupplier}
* @param stateUpdateSupplier the instance of {@link ProcessorSupplier}
* @return itself
* @throws TopologyException if the processor of state is already
registered
@@ -478,18 +498,14 @@ public synchronized StreamsBuilder addStateStore(final
StoreBuilder builder) {
@SuppressWarnings("unchecked")
public synchronized StreamsBuilder addGlobalStore(final StoreBuilder
storeBuilder,
final String topic,
- final String sourceName,
final Consumed consumed,
- final String
processorName,
final ProcessorSupplier
stateUpdateSupplier) {
Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
internalStreamsBuilder.addGlobalStore(storeBuilder,
- sourceName,
- topic,
- new ConsumedInternal<>(consumed),
- processorName,
- stateUpdateSupplier);
+ topic,
+ new ConsumedInternal<>(consumed),
+ stateUpdateSupplier);
return this;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 4308e5d0c50..28787a5f865 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -194,4 +194,20 @@ public synchronized void addGlobalStore(final
StoreBuilder<KeyValueStore> storeB
processorName,
stateUpdateSupplier);
}
+
+ public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore>
storeBuilder,
+ final String topic,
+ final ConsumedInternal consumed,
+ final ProcessorSupplier
stateUpdateSupplier) {
+ // explicitly disable logging for global stores
+ storeBuilder.withLoggingDisabled();
+ final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME);
+ final String processorName = newProcessorName(KTableImpl.SOURCE_NAME);
+ addGlobalStore(storeBuilder,
+ sourceName,
+ topic,
+ consumed,
+ processorName,
+ stateUpdateSupplier);
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Simplify StreamsBuilder#addGlobalStore
> --------------------------------------
>
> Key: KAFKA-6138
> URL: https://issues.apache.org/jira/browse/KAFKA-6138
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.0.0
> Reporter: Matthias J. Sax
> Assignee: Panuwat Anawatmongkhon
> Priority: Major
> Labels: beginner, kip, newbie
>
> {{StreamsBuilder#addGlobalStore}} is conceptually a 1:1 copy of
> {{Topology#addGlobalStore}}, that would follow DSL design principles though.
> Atm, {{StreamsBuilder#addGlobalStore}} does not follow provide a good user
> experience as it forces users to specify names for processor names –
> processor name are a Processor API detail should be hidden in the DSL. The
> current API is the following:
> {noformat}
> public synchronized StreamsBuilder addGlobalStore(final StoreBuilder
> storeBuilder,
> final String topic,
> final String sourceName,
> final Consumed consumed,
> final String
> processorName,
> final ProcessorSupplier
> stateUpdateSupplier)
> {noformat}
> We should remove the two parameters {{sourceName}} and {{processorName}}. To
> be backward compatible, the current method must be deprecated and a new
> method should be added with reduced number of parameters.
> KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-233%3A+Simplify+StreamsBuilder%23addGlobalStore
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)