[ https://issues.apache.org/jira/browse/KAFKA-6138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347672#comment-16347672 ]
ASF GitHub Bot commented on KAFKA-6138: --------------------------------------- mjsax closed pull request #4430: KAFKA-6138 Simplify StreamsBuilder#addGlobalStore URL: https://github.com/apache/kafka/pull/4430 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index a94b0a74622..6551deeaed2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -453,6 +453,28 @@ public synchronized StreamsBuilder addStateStore(final StoreBuilder builder) { return this; } + /** + * @deprecated use {@link #addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier)} instead + */ + @SuppressWarnings("unchecked") + @Deprecated + public synchronized StreamsBuilder addGlobalStore(final StoreBuilder storeBuilder, + final String topic, + final String sourceName, + final Consumed consumed, + final String processorName, + final ProcessorSupplier stateUpdateSupplier) { + Objects.requireNonNull(storeBuilder, "storeBuilder can't be null"); + Objects.requireNonNull(consumed, "consumed can't be null"); + internalStreamsBuilder.addGlobalStore(storeBuilder, + sourceName, + topic, + new ConsumedInternal<>(consumed), + processorName, + stateUpdateSupplier); + return this; + } + /** * Adds a global {@link StateStore} to the topology. * The {@link StateStore} sources its data from all partitions of the provided input topic. @@ -467,10 +489,8 @@ public synchronized StreamsBuilder addStateStore(final StoreBuilder builder) { * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. * * @param storeBuilder user defined {@link StoreBuilder}; can't be {@code null} - * @param sourceName name of the {@link SourceNode} that will be automatically added * @param topic the topic to source the data from * @param consumed the instance of {@link Consumed} used to define optional parameters; can't be {@code null} - * @param processorName the name of the {@link ProcessorSupplier} * @param stateUpdateSupplier the instance of {@link ProcessorSupplier} * @return itself * @throws TopologyException if the processor of state is already registered @@ -478,18 +498,14 @@ public synchronized StreamsBuilder addStateStore(final StoreBuilder builder) { @SuppressWarnings("unchecked") public synchronized StreamsBuilder addGlobalStore(final StoreBuilder storeBuilder, final String topic, - final String sourceName, final Consumed consumed, - final String processorName, final ProcessorSupplier stateUpdateSupplier) { Objects.requireNonNull(storeBuilder, "storeBuilder can't be null"); Objects.requireNonNull(consumed, "consumed can't be null"); internalStreamsBuilder.addGlobalStore(storeBuilder, - sourceName, - topic, - new ConsumedInternal<>(consumed), - processorName, - stateUpdateSupplier); + topic, + new ConsumedInternal<>(consumed), + stateUpdateSupplier); return this; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 4308e5d0c50..28787a5f865 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -194,4 +194,20 @@ public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeB processorName, stateUpdateSupplier); } + + public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder, + final String topic, + final ConsumedInternal consumed, + final ProcessorSupplier stateUpdateSupplier) { + // explicitly disable logging for global stores + storeBuilder.withLoggingDisabled(); + final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME); + final String processorName = newProcessorName(KTableImpl.SOURCE_NAME); + addGlobalStore(storeBuilder, + sourceName, + topic, + consumed, + processorName, + stateUpdateSupplier); + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Simplify StreamsBuilder#addGlobalStore > -------------------------------------- > > Key: KAFKA-6138 > URL: https://issues.apache.org/jira/browse/KAFKA-6138 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.0.0 > Reporter: Matthias J. Sax > Assignee: Panuwat Anawatmongkhon > Priority: Major > Labels: beginner, kip, newbie > > {{StreamsBuilder#addGlobalStore}} is conceptually a 1:1 copy of > {{Topology#addGlobalStore}}, that would follow DSL design principles though. > Atm, {{StreamsBuilder#addGlobalStore}} does not follow provide a good user > experience as it forces users to specify names for processor names – > processor name are a Processor API detail should be hidden in the DSL. The > current API is the following: > {noformat} > public synchronized StreamsBuilder addGlobalStore(final StoreBuilder > storeBuilder, > final String topic, > final String sourceName, > final Consumed consumed, > final String > processorName, > final ProcessorSupplier > stateUpdateSupplier) > {noformat} > We should remove the two parameters {{sourceName}} and {{processorName}}. To > be backward compatible, the current method must be deprecated and a new > method should be added with reduced number of parameters. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-233%3A+Simplify+StreamsBuilder%23addGlobalStore -- This message was sent by Atlassian JIRA (v7.6.3#76005)