This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 83b058a  MINOR: Code cleanup (#4229)
83b058a is described below

commit 83b058a0d13955ade43440963f480c7fe3d94c6a
Author: Kamal C <kamal.chandraprak...@gmail.com>
AuthorDate: Tue Feb 13 10:24:30 2018 +0530

    MINOR: Code cleanup (#4229)
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>, Damian Guy 
<dam...@confluent.io>, Paolo Patierno <ppatie...@live.com>, Ismael Juma 
<ism...@juma.me.uk>
---
 .../org/apache/kafka/common/metrics/Sensor.java    | 10 +++------
 .../kstream/internals/ConsumedInternal.java        |  9 ++++++++
 .../kstream/internals/InternalStreamsBuilder.java  | 26 +++++++++-------------
 3 files changed, 22 insertions(+), 23 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 321fab6..837ac2e 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -79,8 +79,8 @@ public final class Sensor {
 
         public static RecordingLevel forId(int id) {
             if (id < MIN_RECORDING_LEVEL_KEY || id > MAX_RECORDING_LEVEL_KEY)
-                throw new IllegalArgumentException(String.format("Unexpected 
RecordLevel id `%s`, it should be between `%s` " +
-                    "and `%s` (inclusive)", id, MIN_RECORDING_LEVEL_KEY, 
MAX_RECORDING_LEVEL_KEY));
+                throw new IllegalArgumentException(String.format("Unexpected 
RecordLevel id `%d`, it should be between `%d` " +
+                    "and `%d` (inclusive)", id, MIN_RECORDING_LEVEL_KEY, 
MAX_RECORDING_LEVEL_KEY));
             return ID_TO_TYPE[id];
         }
 
@@ -90,11 +90,7 @@ public final class Sensor {
         }
 
         public boolean shouldRecord(final int configId) {
-            if (configId == DEBUG.id) {
-                return true;
-            } else {
-                return configId == this.id;
-            }
+            return configId == DEBUG.id || configId == this.id;
         }
 
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
index 5e8f74b..2438269 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.Topology;
@@ -42,10 +43,18 @@ public class ConsumedInternal<K, V> extends Consumed<K, V> {
         return keySerde;
     }
 
+    public Deserializer<K> keyDeserializer() {
+        return keySerde == null ? null : keySerde.deserializer();
+    }
+
     public Serde<V> valueSerde() {
         return valueSerde;
     }
 
+    public Deserializer<V> valueDeserializer() {
+        return valueSerde == null ? null : valueSerde.deserializer();
+    }
+
     public TimestampExtractor timestampExtractor() {
         return timestampExtractor;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 0b028e6..fa47444 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
@@ -49,8 +48,8 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
         internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
                                           name,
                                           consumed.timestampExtractor(),
-                                          consumed.keySerde() == null ? null : 
consumed.keySerde().deserializer(),
-                                          consumed.valueSerde() == null ? null 
: consumed.valueSerde().deserializer(),
+                                          consumed.keyDeserializer(),
+                                          consumed.valueDeserializer(),
                                           topics.toArray(new 
String[topics.size()]));
 
         return new KStreamImpl<>(this, name, Collections.singleton(name), 
false);
@@ -62,8 +61,8 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
         internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
                                           name,
                                           consumed.timestampExtractor(),
-                                          consumed.keySerde() == null ? null : 
consumed.keySerde().deserializer(),
-                                          consumed.valueSerde() == null ? null 
: consumed.valueSerde().deserializer(),
+                                          consumed.keyDeserializer(),
+                                          consumed.valueDeserializer(),
                                           topicPattern);
 
         return new KStreamImpl<>(this, name, Collections.singleton(name), 
false);
@@ -124,8 +123,8 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
         internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
                                           source,
                                           consumed.timestampExtractor(),
-                                          consumed.keySerde() == null ? null : 
consumed.keySerde().deserializer(),
-                                          consumed.valueSerde() == null ? null 
: consumed.valueSerde().deserializer(),
+                                          consumed.keyDeserializer(),
+                                          consumed.valueDeserializer(),
                                           topic);
         internalTopologyBuilder.addProcessor(name, processorSupplier, source);
 
@@ -147,14 +146,11 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
         final KTableSource<K, V> tableSource = new 
KTableSource<>(storeBuilder.name());
 
 
-        final Deserializer<K> keyDeserializer = consumed.keySerde() == null ? 
null : consumed.keySerde().deserializer();
-        final Deserializer<V> valueDeserializer = consumed.valueSerde() == 
null ? null : consumed.valueSerde().deserializer();
-
         internalTopologyBuilder.addGlobalStore(storeBuilder,
                                                sourceName,
                                                consumed.timestampExtractor(),
-                                               keyDeserializer,
-                                               valueDeserializer,
+                                               consumed.keyDeserializer(),
+                                               consumed.valueDeserializer(),
                                                topic,
                                                processorName,
                                                tableSource);
@@ -183,13 +179,11 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
                                             final ProcessorSupplier 
stateUpdateSupplier) {
         // explicitly disable logging for global stores
         storeBuilder.withLoggingDisabled();
-        final Deserializer keyDeserializer = consumed.keySerde() == null ? 
null : consumed.keySerde().deserializer();
-        final Deserializer valueDeserializer = consumed.valueSerde() == null ? 
null : consumed.valueSerde().deserializer();
         internalTopologyBuilder.addGlobalStore(storeBuilder,
                                                sourceName,
                                                consumed.timestampExtractor(),
-                                               keyDeserializer,
-                                               valueDeserializer,
+                                               consumed.keyDeserializer(),
+                                               consumed.valueDeserializer(),
                                                topic,
                                                processorName,
                                                stateUpdateSupplier);

-- 
To stop receiving notification emails like this one, please contact
guozh...@apache.org.

Reply via email to