This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch stream-level-consumer-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 8b0f2c844937e3f32aa654ac00b14c6cd295b38a
Author: kishoreg <[email protected]>
AuthorDate: Tue Dec 3 15:31:40 2019 -0800

    Simplifying interface related to StreamLevelConsumer
---
 .../core/realtime/impl/kafka/KafkaConsumerFactory.java |  7 +++----
 .../impl/kafka/KafkaHighLevelStreamConfig.java         |  6 +++---
 .../realtime/impl/kafka/KafkaStreamLevelConsumer.java  | 16 ++--------------
 .../realtime/impl/kafka2/KafkaConsumerFactory.java     |  4 ++--
 .../realtime/impl/kafka2/KafkaStreamLevelConsumer.java | 18 +++---------------
 .../impl/kafka2/KafkaStreamLevelStreamConfig.java      |  6 +++---
 .../manager/realtime/HLRealtimeSegmentDataManager.java | 15 ++++++++++++++-
 .../manager/realtime/LLRealtimeSegmentDataManager.java |  2 +-
 .../core/indexsegment/mutable/MutableSegment.java      |  2 +-
 .../core/indexsegment/mutable/MutableSegmentImpl.java  |  2 +-
 .../pinot/core/realtime/stream/MessageBatch.java       |  1 -
 .../pinot/core/realtime/stream}/RowMetadata.java       |  2 +-
 .../core/realtime/stream/StreamConsumerFactory.java    |  5 ++---
 .../core/realtime/stream/StreamMessageMetadata.java    |  3 ---
 .../impl/fakestream/FakeStreamConsumerFactory.java     |  2 +-
 .../FlakyConsumerRealtimeClusterIntegrationTest.java   |  9 ++++-----
 16 files changed, 41 insertions(+), 59 deletions(-)

diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConsumerFactory.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConsumerFactory.java
index d91c2e9..df4770e 100644
--- 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConsumerFactory.java
+++ 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConsumerFactory.java
@@ -48,14 +48,13 @@ public class KafkaConsumerFactory extends 
StreamConsumerFactory {
    * @param clientId
    * @param tableName
    * @param schema
-   * @param instanceZKMetadata
-   * @param serverMetrics
+   * @param groupId
    * @return
    */
   @Override
   public StreamLevelConsumer createStreamLevelConsumer(String clientId, String 
tableName, Schema schema,
-      InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
-    return new KafkaStreamLevelConsumer(clientId, tableName, _streamConfig, 
schema, instanceZKMetadata, serverMetrics);
+      String groupId) {
+    return new KafkaStreamLevelConsumer(clientId, tableName, _streamConfig, 
schema, groupId);
   }
 
   /**
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaHighLevelStreamConfig.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaHighLevelStreamConfig.java
index 27476a5..a1933ca 100644
--- 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaHighLevelStreamConfig.java
+++ 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaHighLevelStreamConfig.java
@@ -65,10 +65,10 @@ public class KafkaHighLevelStreamConfig {
    * Builds a wrapper around {@link StreamConfig} to fetch kafka stream level 
consumer specific configs
    * @param streamConfig
    * @param tableName
-   * @param instanceZKMetadata
+   * @param groupId
    */
   public KafkaHighLevelStreamConfig(StreamConfig streamConfig, String 
tableName,
-      InstanceZKMetadata instanceZKMetadata) {
+      String groupId) {
     Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
 
     _kafkaTopicName = streamConfig.getTopicName();
@@ -78,7 +78,7 @@ public class KafkaHighLevelStreamConfig {
     _zkBrokerUrl = streamConfigMap.get(hlcZkBrokerUrlKey);
     Preconditions.checkNotNull(_zkBrokerUrl,
         "Must specify zk broker connect string " + hlcZkBrokerUrlKey + " in 
high level kafka consumer");
-    _groupId = instanceZKMetadata.getGroupId(tableName);
+    _groupId = groupId;
 
     _kafkaConsumerProperties = new HashMap<>();
     String kafkaConsumerPropertyPrefix =
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
index 56d9f77..d6b14a8 100644
--- 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
+++ 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
@@ -55,16 +55,14 @@ public class KafkaStreamLevelConsumer implements 
StreamLevelConsumer {
   private long lastCount = 0;
   private long currentCount = 0L;
 
-  private ServerMetrics _serverMetrics;
   private Meter tableAndStreamRowsConsumed = null;
   private Meter tableRowsConsumed = null;
 
   public KafkaStreamLevelConsumer(String clientId, String tableName, 
StreamConfig streamConfig, Schema schema,
-      InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
+      String groupId) {
     _clientId = clientId;
     _streamConfig = streamConfig;
-    _kafkaHighLevelStreamConfig = new KafkaHighLevelStreamConfig(streamConfig, 
tableName, instanceZKMetadata);
-    _serverMetrics = serverMetrics;
+    _kafkaHighLevelStreamConfig = new KafkaHighLevelStreamConfig(streamConfig, 
tableName, groupId);
 
     _messageDecoder = StreamDecoderProvider.create(streamConfig, schema);
 
@@ -87,12 +85,6 @@ public class KafkaStreamLevelConsumer implements 
StreamLevelConsumer {
     if (kafkaIterator.hasNext()) {
       try {
         destination = _messageDecoder.decode(kafkaIterator.next().message(), 
destination);
-        tableAndStreamRowsConsumed = _serverMetrics
-            .addMeteredTableValue(_tableAndStreamName, 
ServerMeter.REALTIME_ROWS_CONSUMED, 1L,
-                tableAndStreamRowsConsumed);
-        tableRowsConsumed =
-            
_serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L, 
tableRowsConsumed);
-
         ++currentCount;
 
         final long now = System.currentTimeMillis();
@@ -110,8 +102,6 @@ public class KafkaStreamLevelConsumer implements 
StreamLevelConsumer {
         return destination;
       } catch (Exception e) {
         INSTANCE_LOGGER.warn("Caught exception while consuming events", e);
-        _serverMetrics.addMeteredTableValue(_tableAndStreamName, 
ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L);
-        
_serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS,
 1L);
         throw e;
       }
     }
@@ -121,8 +111,6 @@ public class KafkaStreamLevelConsumer implements 
StreamLevelConsumer {
   @Override
   public void commit() {
     consumer.commitOffsets();
-    _serverMetrics.addMeteredTableValue(_tableAndStreamName, 
ServerMeter.REALTIME_OFFSET_COMMITS, 1L);
-    _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_OFFSET_COMMITS, 
1L);
   }
 
   @Override
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java
index 919c366..d11f814 100644
--- 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java
@@ -35,8 +35,8 @@ public class KafkaConsumerFactory extends 
StreamConsumerFactory {
 
   @Override
   public StreamLevelConsumer createStreamLevelConsumer(String clientId, String 
tableName, Schema schema,
-      InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
-    return new KafkaStreamLevelConsumer(clientId, tableName, _streamConfig, 
schema, instanceZKMetadata, serverMetrics);
+      String groupId) {
+    return new KafkaStreamLevelConsumer(clientId, tableName, _streamConfig, 
schema, groupId);
   }
 
   @Override
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelConsumer.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelConsumer.java
index 4a91894..86a8c2b 100644
--- 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelConsumer.java
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelConsumer.java
@@ -65,16 +65,13 @@ public class KafkaStreamLevelConsumer implements 
StreamLevelConsumer {
   private long lastCount = 0;
   private long currentCount = 0L;
 
-  private ServerMetrics _serverMetrics;
-  private Meter tableAndStreamRowsConsumed = null;
-  private Meter tableRowsConsumed = null;
+
 
   public KafkaStreamLevelConsumer(String clientId, String tableName, 
StreamConfig streamConfig, Schema schema,
-      InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
+      String groupId) {
     _clientId = clientId;
     _streamConfig = streamConfig;
-    _kafkaStreamLevelStreamConfig = new 
KafkaStreamLevelStreamConfig(streamConfig, tableName, instanceZKMetadata);
-    _serverMetrics = serverMetrics;
+    _kafkaStreamLevelStreamConfig = new 
KafkaStreamLevelStreamConfig(streamConfig, tableName, groupId);
 
     _messageDecoder = StreamDecoderProvider.create(streamConfig, schema);
 
@@ -112,11 +109,6 @@ public class KafkaStreamLevelConsumer implements 
StreamLevelConsumer {
         final ConsumerRecord<Bytes, Bytes> record = kafkaIterator.next();
         updateOffsets(record.partition(), record.offset());
         destination = _messageDecoder.decode(record.value().get(), 
destination);
-        tableAndStreamRowsConsumed = _serverMetrics
-            .addMeteredTableValue(_tableAndStreamName, 
ServerMeter.REALTIME_ROWS_CONSUMED, 1L,
-                tableAndStreamRowsConsumed);
-        tableRowsConsumed =
-            
_serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L, 
tableRowsConsumed);
 
         ++currentCount;
 
@@ -135,8 +127,6 @@ public class KafkaStreamLevelConsumer implements 
StreamLevelConsumer {
         return destination;
       } catch (Exception e) {
         INSTANCE_LOGGER.warn("Caught exception while consuming events", e);
-        _serverMetrics.addMeteredTableValue(_tableAndStreamName, 
ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L);
-        
_serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS,
 1L);
         throw e;
       }
     }
@@ -153,8 +143,6 @@ public class KafkaStreamLevelConsumer implements 
StreamLevelConsumer {
     // Since the lastest batch may not be consumed fully, so we need to reset 
kafka consumer's offset.
     resetOffsets();
     consumerOffsets.clear();
-    _serverMetrics.addMeteredTableValue(_tableAndStreamName, 
ServerMeter.REALTIME_OFFSET_COMMITS, 1L);
-    _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_OFFSET_COMMITS, 
1L);
   }
 
   private Map<TopicPartition, OffsetAndMetadata> getOffsetsMap() {
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelStreamConfig.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelStreamConfig.java
index fe02d05..c1ca9c3 100644
--- 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelStreamConfig.java
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelStreamConfig.java
@@ -46,10 +46,10 @@ public class KafkaStreamLevelStreamConfig {
    * Builds a wrapper around {@link StreamConfig} to fetch kafka stream level 
consumer specific configs
    * @param streamConfig
    * @param tableName
-   * @param instanceZKMetadata
+   * @param groupId
    */
   public KafkaStreamLevelStreamConfig(StreamConfig streamConfig, String 
tableName,
-      InstanceZKMetadata instanceZKMetadata) {
+      String groupId) {
     Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
 
     _kafkaTopicName = streamConfig.getTopicName();
@@ -58,7 +58,7 @@ public class KafkaStreamLevelStreamConfig {
     _bootstrapServers = streamConfigMap.get(hlcBootstrapBrokerUrlKey);
     Preconditions.checkNotNull(_bootstrapServers,
         "Must specify bootstrap broker connect string " + 
hlcBootstrapBrokerUrlKey + " in high level kafka consumer");
-    _groupId = instanceZKMetadata.getGroupId(tableName);
+    _groupId = groupId;
 
     _kafkaConsumerProperties = new HashMap<>();
     String kafkaConsumerPropertyPrefix =
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index c7062b0..f80b4fc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.data.manager.realtime;
 
 import com.google.common.util.concurrent.Uninterruptibles;
+import com.yammer.metrics.core.Meter;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
@@ -94,6 +95,9 @@ public class HLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   private Logger segmentLogger = LOGGER;
   private final SegmentVersion _segmentVersion;
 
+  private Meter tableAndStreamRowsConsumed = null;
+  private Meter tableRowsConsumed = null;
+
   // An instance of this class exists only for the duration of the realtime 
segment that is currently being consumed.
   // Once the segment is committed, the segment is handled by 
OfflineSegmentDataManager
   public HLRealtimeSegmentDataManager(final RealtimeSegmentZKMetadata 
realtimeSegmentZKMetadata,
@@ -164,7 +168,7 @@ public class HLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     _streamConsumerFactory = 
StreamConsumerFactoryProvider.create(_streamConfig);
     String clientId = HLRealtimeSegmentDataManager.class.getSimpleName() + "-" 
+ _streamConfig.getTopicName();
     _streamLevelConsumer = _streamConsumerFactory
-        .createStreamLevelConsumer(clientId, tableNameWithType, schema, 
instanceMetadata, serverMetrics);
+        .createStreamLevelConsumer(clientId, tableNameWithType, schema, 
instanceMetadata.getGroupId(tableNameWithType));
     _streamLevelConsumer.start();
 
     tableStreamName = tableNameWithType + "_" + _streamConfig.getTopicName();
@@ -220,9 +224,16 @@ public class HLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
             GenericRow consumedRow;
             try {
               consumedRow = _streamLevelConsumer.next(reuse);
+              tableAndStreamRowsConsumed = serverMetrics
+                  .addMeteredTableValue(tableStreamName, 
ServerMeter.REALTIME_ROWS_CONSUMED, 1L,
+                      tableAndStreamRowsConsumed);
+              tableRowsConsumed =
+                  
serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L, 
tableRowsConsumed);
             } catch (Exception e) {
               segmentLogger.warn("Caught exception while consuming row, 
sleeping for {} ms", exceptionSleepMillis, e);
               numRowsErrored++;
+              serverMetrics.addMeteredTableValue(tableStreamName, 
ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L);
+              
serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS,
 1L);
 
               // Sleep for a short time as to avoid filling the logs with 
exceptions too quickly
               Uninterruptibles.sleepUninterruptibly(exceptionSleepMillis, 
TimeUnit.MILLISECONDS);
@@ -297,6 +308,8 @@ public class HLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
             _streamLevelConsumer.shutdown();
             segmentLogger
                 .info("Successfully committed {} offsets, consumer release 
requested.", _streamConfig.getType());
+            serverMetrics.addMeteredTableValue(tableStreamName, 
ServerMeter.REALTIME_OFFSET_COMMITS, 1L);
+            
serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_OFFSET_COMMITS, 1L);
           } catch (Throwable e) {
             // If we got here, it means that either the commit or the shutdown 
failed. Considering that the
             // KafkaConsumerManager delays shutdown and only adds the consumer 
to be released in a deferred way, this
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 8192305..8efea23 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -41,7 +41,7 @@ import org.apache.pinot.common.config.SegmentPartitionConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.data.StarTreeIndexSpec;
-import org.apache.pinot.common.metadata.RowMetadata;
+import org.apache.pinot.core.realtime.stream.RowMetadata;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
index 14cffed..027baf1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.core.indexsegment.mutable;
 
 import javax.annotation.Nullable;
-import org.apache.pinot.common.metadata.RowMetadata;
+import org.apache.pinot.core.realtime.stream.RowMetadata;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index 461c7ae..9259462 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -35,7 +35,7 @@ import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.common.metadata.RowMetadata;
+import org.apache.pinot.core.realtime.stream.RowMetadata;
 import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.NetUtil;
 import org.apache.pinot.spi.data.readers.GenericRow;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
index c4c5428..cb20a13 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.realtime.stream;
 
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
-import org.apache.pinot.common.metadata.RowMetadata;
 
 
 /**
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/RowMetadata.java
similarity index 97%
rename from 
pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/RowMetadata.java
index 7c25db0..bb4997b 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/RowMetadata.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metadata;
+package org.apache.pinot.core.realtime.stream;
 
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConsumerFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConsumerFactory.java
index 49fa512..1f6889b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConsumerFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConsumerFactory.java
@@ -50,12 +50,11 @@ public abstract class StreamConsumerFactory {
    * @param clientId a client id to identify the creator of this consumer
    * @param tableName the table name for the topic of this consumer
    * @param schema the pinot schema of the event being consumed
-   * @param instanceZKMetadata the instance metadata
-   * @param serverMetrics metrics object to emit consumption related metrics
+   * @param groupId consumer group Id
    * @return
    */
   public abstract StreamLevelConsumer createStreamLevelConsumer(String 
clientId, String tableName, Schema schema,
-      InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics);
+      String groupId);
 
   /**
    * Creates a metadata provider which provides partition specific metadata
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
index 9ad416a..31983f0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
@@ -18,9 +18,6 @@
  */
 package org.apache.pinot.core.realtime.stream;
 
-import org.apache.pinot.common.metadata.RowMetadata;
-
-
 /**
  * A class that provides metadata associated with the message of a stream, for 
e.g.,
  * ingestion-timestamp of the message.
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index a6963db..678b3a6 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -49,7 +49,7 @@ public class FakeStreamConsumerFactory extends 
StreamConsumerFactory {
 
   @Override
   public StreamLevelConsumer createStreamLevelConsumer(String clientId, String 
tableName, Schema schema,
-      InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
+      String groupId) {
     return new FakeStreamLevelConsumer();
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
index a0de3bf..4be8c95 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
@@ -55,13 +55,13 @@ public class FlakyConsumerRealtimeClusterIntegrationTest 
extends RealtimeCluster
     private Random _random = new Random();
 
     public FlakyStreamLevelConsumer(String clientId, String tableName, 
StreamConfig streamConfig, Schema schema,
-        InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
+        String groupId) {
       try {
         final Constructor constructor = 
Class.forName(KafkaStarterUtils.KAFKA_STREAM_LEVEL_CONSUMER_CLASS_NAME)
             .getConstructor(String.class, String.class, StreamConfig.class, 
Schema.class, InstanceZKMetadata.class,
                 ServerMetrics.class);
         _streamLevelConsumer = (StreamLevelConsumer) constructor
-            .newInstance(clientId, tableName, streamConfig, schema, 
instanceZKMetadata, serverMetrics);
+            .newInstance(clientId, tableName, streamConfig, schema, groupId);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -114,9 +114,8 @@ public class FlakyConsumerRealtimeClusterIntegrationTest 
extends RealtimeCluster
 
     @Override
     public StreamLevelConsumer createStreamLevelConsumer(String clientId, 
String tableName, Schema schema,
-        InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
-      return new FlakyStreamLevelConsumer(clientId, tableName, _streamConfig, 
schema, instanceZKMetadata,
-          serverMetrics);
+        String groupId) {
+      return new FlakyStreamLevelConsumer(clientId, tableName, _streamConfig, 
schema, groupId);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to