[ 
https://issues.apache.org/jira/browse/KAFKA-6138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347672#comment-16347672
 ] 

ASF GitHub Bot commented on KAFKA-6138:
---------------------------------------

mjsax closed pull request #4430: KAFKA-6138 Simplify 
StreamsBuilder#addGlobalStore
URL: https://github.com/apache/kafka/pull/4430
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index a94b0a74622..6551deeaed2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -453,6 +453,28 @@ public synchronized StreamsBuilder addStateStore(final 
StoreBuilder builder) {
         return this;
     }
 
+    /**
+     * @deprecated use {@link #addGlobalStore(StoreBuilder, String, Consumed, 
ProcessorSupplier)} instead
+     */
+    @SuppressWarnings("unchecked")
+    @Deprecated
+    public synchronized StreamsBuilder addGlobalStore(final StoreBuilder 
storeBuilder,
+                                                      final String topic,
+                                                      final String sourceName,
+                                                      final Consumed consumed,
+                                                      final String 
processorName,
+                                                      final ProcessorSupplier 
stateUpdateSupplier) {
+        Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
+        internalStreamsBuilder.addGlobalStore(storeBuilder,
+                                              sourceName,
+                                              topic,
+                                              new ConsumedInternal<>(consumed),
+                                              processorName,
+                                              stateUpdateSupplier);
+        return this;
+    }
+
     /**
      * Adds a global {@link StateStore} to the topology.
      * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
@@ -467,10 +489,8 @@ public synchronized StreamsBuilder addStateStore(final 
StoreBuilder builder) {
      * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
      *
      * @param storeBuilder          user defined {@link StoreBuilder}; can't 
be {@code null}
-     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
      * @param topic                 the topic to source the data from
      * @param consumed              the instance of {@link Consumed} used to 
define optional parameters; can't be {@code null}
-     * @param processorName         the name of the {@link ProcessorSupplier}
      * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
      * @return itself
      * @throws TopologyException if the processor of state is already 
registered
@@ -478,18 +498,14 @@ public synchronized StreamsBuilder addStateStore(final 
StoreBuilder builder) {
     @SuppressWarnings("unchecked")
     public synchronized StreamsBuilder addGlobalStore(final StoreBuilder 
storeBuilder,
                                                       final String topic,
-                                                      final String sourceName,
                                                       final Consumed consumed,
-                                                      final String 
processorName,
                                                       final ProcessorSupplier 
stateUpdateSupplier) {
         Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
         internalStreamsBuilder.addGlobalStore(storeBuilder,
-                                              sourceName,
-                                              topic,
-                                              new ConsumedInternal<>(consumed),
-                                              processorName,
-                                              stateUpdateSupplier);
+                topic,
+                new ConsumedInternal<>(consumed),
+                stateUpdateSupplier);
         return this;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 4308e5d0c50..28787a5f865 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -194,4 +194,20 @@ public synchronized void addGlobalStore(final 
StoreBuilder<KeyValueStore> storeB
                                                processorName,
                                                stateUpdateSupplier);
     }
+    
+    public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> 
storeBuilder,
+                                            final String topic,
+                                            final ConsumedInternal consumed,
+                                            final ProcessorSupplier 
stateUpdateSupplier) {
+        // explicitly disable logging for global stores
+        storeBuilder.withLoggingDisabled();
+        final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME);
+        final String processorName = newProcessorName(KTableImpl.SOURCE_NAME);
+        addGlobalStore(storeBuilder,
+                       sourceName,
+                       topic,
+                       consumed,
+                       processorName,
+                       stateUpdateSupplier);
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Simplify StreamsBuilder#addGlobalStore
> --------------------------------------
>
>                 Key: KAFKA-6138
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6138
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Panuwat Anawatmongkhon
>            Priority: Major
>              Labels: beginner, kip, newbie
>
> {{StreamsBuilder#addGlobalStore}} is conceptually a 1:1 copy of 
> {{Topology#addGlobalStore}}, that would follow DSL design principles though. 
> Atm, {{StreamsBuilder#addGlobalStore}} does not follow provide a good user 
> experience as it forces users to specify names for processor names – 
> processor name are a Processor API detail should be hidden in the DSL. The 
> current API is the following:
> {noformat}
>     public synchronized StreamsBuilder addGlobalStore(final StoreBuilder 
> storeBuilder,
>                                                       final String topic,
>                                                       final String sourceName,
>                                                       final Consumed consumed,
>                                                       final String 
> processorName,
>                                                       final ProcessorSupplier 
> stateUpdateSupplier)
> {noformat}
> We should remove the two parameters {{sourceName}} and {{processorName}}. To 
> be backward compatible, the current method must be deprecated and a new 
> method should be added with reduced number of parameters. 
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-233%3A+Simplify+StreamsBuilder%23addGlobalStore



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to