This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch stream-level-consumer-refactor in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 8b0f2c844937e3f32aa654ac00b14c6cd295b38a Author: kishoreg <[email protected]> AuthorDate: Tue Dec 3 15:31:40 2019 -0800 Simplifying interface related to StreamLevelConsumer --- .../core/realtime/impl/kafka/KafkaConsumerFactory.java | 7 +++---- .../impl/kafka/KafkaHighLevelStreamConfig.java | 6 +++--- .../realtime/impl/kafka/KafkaStreamLevelConsumer.java | 16 ++-------------- .../realtime/impl/kafka2/KafkaConsumerFactory.java | 4 ++-- .../realtime/impl/kafka2/KafkaStreamLevelConsumer.java | 18 +++--------------- .../impl/kafka2/KafkaStreamLevelStreamConfig.java | 6 +++--- .../manager/realtime/HLRealtimeSegmentDataManager.java | 15 ++++++++++++++- .../manager/realtime/LLRealtimeSegmentDataManager.java | 2 +- .../core/indexsegment/mutable/MutableSegment.java | 2 +- .../core/indexsegment/mutable/MutableSegmentImpl.java | 2 +- .../pinot/core/realtime/stream/MessageBatch.java | 1 - .../pinot/core/realtime/stream}/RowMetadata.java | 2 +- .../core/realtime/stream/StreamConsumerFactory.java | 5 ++--- .../core/realtime/stream/StreamMessageMetadata.java | 3 --- .../impl/fakestream/FakeStreamConsumerFactory.java | 2 +- .../FlakyConsumerRealtimeClusterIntegrationTest.java | 9 ++++----- 16 files changed, 41 insertions(+), 59 deletions(-) diff --git a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConsumerFactory.java b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConsumerFactory.java index d91c2e9..df4770e 100644 --- a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConsumerFactory.java +++ b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConsumerFactory.java @@ -48,14 +48,13 @@ public class KafkaConsumerFactory extends StreamConsumerFactory { * @param clientId * @param tableName * @param schema - * @param instanceZKMetadata - * @param serverMetrics + * @param groupId * @return */ @Override public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Schema schema, - InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) { - return new KafkaStreamLevelConsumer(clientId, tableName, _streamConfig, schema, instanceZKMetadata, serverMetrics); + String groupId) { + return new KafkaStreamLevelConsumer(clientId, tableName, _streamConfig, schema, groupId); } /** diff --git a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaHighLevelStreamConfig.java b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaHighLevelStreamConfig.java index 27476a5..a1933ca 100644 --- a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaHighLevelStreamConfig.java +++ b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaHighLevelStreamConfig.java @@ -65,10 +65,10 @@ public class KafkaHighLevelStreamConfig { * Builds a wrapper around {@link StreamConfig} to fetch kafka stream level consumer specific configs * @param streamConfig * @param tableName - * @param instanceZKMetadata + * @param groupId */ public KafkaHighLevelStreamConfig(StreamConfig streamConfig, String tableName, - InstanceZKMetadata instanceZKMetadata) { + String groupId) { Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap(); _kafkaTopicName = streamConfig.getTopicName(); @@ -78,7 +78,7 @@ public class KafkaHighLevelStreamConfig { _zkBrokerUrl = streamConfigMap.get(hlcZkBrokerUrlKey); Preconditions.checkNotNull(_zkBrokerUrl, "Must specify zk broker connect string " + hlcZkBrokerUrlKey + " in high level kafka consumer"); - _groupId = instanceZKMetadata.getGroupId(tableName); + _groupId = groupId; _kafkaConsumerProperties = new HashMap<>(); String kafkaConsumerPropertyPrefix = diff --git a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java index 56d9f77..d6b14a8 100644 --- a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java +++ b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java @@ -55,16 +55,14 @@ public class KafkaStreamLevelConsumer implements StreamLevelConsumer { private long lastCount = 0; private long currentCount = 0L; - private ServerMetrics _serverMetrics; private Meter tableAndStreamRowsConsumed = null; private Meter tableRowsConsumed = null; public KafkaStreamLevelConsumer(String clientId, String tableName, StreamConfig streamConfig, Schema schema, - InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) { + String groupId) { _clientId = clientId; _streamConfig = streamConfig; - _kafkaHighLevelStreamConfig = new KafkaHighLevelStreamConfig(streamConfig, tableName, instanceZKMetadata); - _serverMetrics = serverMetrics; + _kafkaHighLevelStreamConfig = new KafkaHighLevelStreamConfig(streamConfig, tableName, groupId); _messageDecoder = StreamDecoderProvider.create(streamConfig, schema); @@ -87,12 +85,6 @@ public class KafkaStreamLevelConsumer implements StreamLevelConsumer { if (kafkaIterator.hasNext()) { try { destination = _messageDecoder.decode(kafkaIterator.next().message(), destination); - tableAndStreamRowsConsumed = _serverMetrics - .addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_ROWS_CONSUMED, 1L, - tableAndStreamRowsConsumed); - tableRowsConsumed = - _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L, tableRowsConsumed); - ++currentCount; final long now = System.currentTimeMillis(); @@ -110,8 +102,6 @@ public class KafkaStreamLevelConsumer implements StreamLevelConsumer { return destination; } catch (Exception e) { INSTANCE_LOGGER.warn("Caught exception while consuming events", e); - _serverMetrics.addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L); - _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L); throw e; } } @@ -121,8 +111,6 @@ public class KafkaStreamLevelConsumer implements StreamLevelConsumer { @Override public void commit() { consumer.commitOffsets(); - _serverMetrics.addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_OFFSET_COMMITS, 1L); - _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_OFFSET_COMMITS, 1L); } @Override diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java index 919c366..d11f814 100644 --- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java @@ -35,8 +35,8 @@ public class KafkaConsumerFactory extends StreamConsumerFactory { @Override public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Schema schema, - InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) { - return new KafkaStreamLevelConsumer(clientId, tableName, _streamConfig, schema, instanceZKMetadata, serverMetrics); + String groupId) { + return new KafkaStreamLevelConsumer(clientId, tableName, _streamConfig, schema, groupId); } @Override diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelConsumer.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelConsumer.java index 4a91894..86a8c2b 100644 --- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelConsumer.java +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelConsumer.java @@ -65,16 +65,13 @@ public class KafkaStreamLevelConsumer implements StreamLevelConsumer { private long lastCount = 0; private long currentCount = 0L; - private ServerMetrics _serverMetrics; - private Meter tableAndStreamRowsConsumed = null; - private Meter tableRowsConsumed = null; + public KafkaStreamLevelConsumer(String clientId, String tableName, StreamConfig streamConfig, Schema schema, - InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) { + String groupId) { _clientId = clientId; _streamConfig = streamConfig; - _kafkaStreamLevelStreamConfig = new KafkaStreamLevelStreamConfig(streamConfig, tableName, instanceZKMetadata); - _serverMetrics = serverMetrics; + _kafkaStreamLevelStreamConfig = new KafkaStreamLevelStreamConfig(streamConfig, tableName, groupId); _messageDecoder = StreamDecoderProvider.create(streamConfig, schema); @@ -112,11 +109,6 @@ public class KafkaStreamLevelConsumer implements StreamLevelConsumer { final ConsumerRecord<Bytes, Bytes> record = kafkaIterator.next(); updateOffsets(record.partition(), record.offset()); destination = _messageDecoder.decode(record.value().get(), destination); - tableAndStreamRowsConsumed = _serverMetrics - .addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_ROWS_CONSUMED, 1L, - tableAndStreamRowsConsumed); - tableRowsConsumed = - _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L, tableRowsConsumed); ++currentCount; @@ -135,8 +127,6 @@ public class KafkaStreamLevelConsumer implements StreamLevelConsumer { return destination; } catch (Exception e) { INSTANCE_LOGGER.warn("Caught exception while consuming events", e); - _serverMetrics.addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L); - _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L); throw e; } } @@ -153,8 +143,6 @@ public class KafkaStreamLevelConsumer implements StreamLevelConsumer { // Since the lastest batch may not be consumed fully, so we need to reset kafka consumer's offset. resetOffsets(); consumerOffsets.clear(); - _serverMetrics.addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_OFFSET_COMMITS, 1L); - _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_OFFSET_COMMITS, 1L); } private Map<TopicPartition, OffsetAndMetadata> getOffsetsMap() { diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelStreamConfig.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelStreamConfig.java index fe02d05..c1ca9c3 100644 --- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelStreamConfig.java +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelStreamConfig.java @@ -46,10 +46,10 @@ public class KafkaStreamLevelStreamConfig { * Builds a wrapper around {@link StreamConfig} to fetch kafka stream level consumer specific configs * @param streamConfig * @param tableName - * @param instanceZKMetadata + * @param groupId */ public KafkaStreamLevelStreamConfig(StreamConfig streamConfig, String tableName, - InstanceZKMetadata instanceZKMetadata) { + String groupId) { Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap(); _kafkaTopicName = streamConfig.getTopicName(); @@ -58,7 +58,7 @@ public class KafkaStreamLevelStreamConfig { _bootstrapServers = streamConfigMap.get(hlcBootstrapBrokerUrlKey); Preconditions.checkNotNull(_bootstrapServers, "Must specify bootstrap broker connect string " + hlcBootstrapBrokerUrlKey + " in high level kafka consumer"); - _groupId = instanceZKMetadata.getGroupId(tableName); + _groupId = groupId; _kafkaConsumerProperties = new HashMap<>(); String kafkaConsumerPropertyPrefix = diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java index c7062b0..f80b4fc 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.data.manager.realtime; import com.google.common.util.concurrent.Uninterruptibles; +import com.yammer.metrics.core.Meter; import java.io.File; import java.util.ArrayList; import java.util.List; @@ -94,6 +95,9 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { private Logger segmentLogger = LOGGER; private final SegmentVersion _segmentVersion; + private Meter tableAndStreamRowsConsumed = null; + private Meter tableRowsConsumed = null; + // An instance of this class exists only for the duration of the realtime segment that is currently being consumed. // Once the segment is committed, the segment is handled by OfflineSegmentDataManager public HLRealtimeSegmentDataManager(final RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, @@ -164,7 +168,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig); String clientId = HLRealtimeSegmentDataManager.class.getSimpleName() + "-" + _streamConfig.getTopicName(); _streamLevelConsumer = _streamConsumerFactory - .createStreamLevelConsumer(clientId, tableNameWithType, schema, instanceMetadata, serverMetrics); + .createStreamLevelConsumer(clientId, tableNameWithType, schema, instanceMetadata.getGroupId(tableNameWithType)); _streamLevelConsumer.start(); tableStreamName = tableNameWithType + "_" + _streamConfig.getTopicName(); @@ -220,9 +224,16 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { GenericRow consumedRow; try { consumedRow = _streamLevelConsumer.next(reuse); + tableAndStreamRowsConsumed = serverMetrics + .addMeteredTableValue(tableStreamName, ServerMeter.REALTIME_ROWS_CONSUMED, 1L, + tableAndStreamRowsConsumed); + tableRowsConsumed = + serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L, tableRowsConsumed); } catch (Exception e) { segmentLogger.warn("Caught exception while consuming row, sleeping for {} ms", exceptionSleepMillis, e); numRowsErrored++; + serverMetrics.addMeteredTableValue(tableStreamName, ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L); + serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L); // Sleep for a short time as to avoid filling the logs with exceptions too quickly Uninterruptibles.sleepUninterruptibly(exceptionSleepMillis, TimeUnit.MILLISECONDS); @@ -297,6 +308,8 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _streamLevelConsumer.shutdown(); segmentLogger .info("Successfully committed {} offsets, consumer release requested.", _streamConfig.getType()); + serverMetrics.addMeteredTableValue(tableStreamName, ServerMeter.REALTIME_OFFSET_COMMITS, 1L); + serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_OFFSET_COMMITS, 1L); } catch (Throwable e) { // If we got here, it means that either the commit or the shutdown failed. Considering that the // KafkaConsumerManager delays shutdown and only adds the consumer to be released in a deferred way, this diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 8192305..8efea23 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -41,7 +41,7 @@ import org.apache.pinot.common.config.SegmentPartitionConfig; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.common.data.StarTreeIndexSpec; -import org.apache.pinot.common.metadata.RowMetadata; +import org.apache.pinot.core.realtime.stream.RowMetadata; import org.apache.pinot.common.metadata.instance.InstanceZKMetadata; import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java index 14cffed..027baf1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java @@ -19,7 +19,7 @@ package org.apache.pinot.core.indexsegment.mutable; import javax.annotation.Nullable; -import org.apache.pinot.common.metadata.RowMetadata; +import org.apache.pinot.core.realtime.stream.RowMetadata; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.core.indexsegment.IndexSegment; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java index 461c7ae..9259462 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java @@ -35,7 +35,7 @@ import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.MetricFieldSpec; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.common.metadata.RowMetadata; +import org.apache.pinot.core.realtime.stream.RowMetadata; import org.apache.pinot.common.segment.SegmentMetadata; import org.apache.pinot.common.utils.NetUtil; import org.apache.pinot.spi.data.readers.GenericRow; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java index c4c5428..cb20a13 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java @@ -20,7 +20,6 @@ package org.apache.pinot.core.realtime.stream; import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.annotations.InterfaceStability; -import org.apache.pinot.common.metadata.RowMetadata; /** diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/RowMetadata.java similarity index 97% rename from pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java rename to pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/RowMetadata.java index 7c25db0..bb4997b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/RowMetadata.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.common.metadata; +package org.apache.pinot.core.realtime.stream; import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.annotations.InterfaceStability; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConsumerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConsumerFactory.java index 49fa512..1f6889b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConsumerFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConsumerFactory.java @@ -50,12 +50,11 @@ public abstract class StreamConsumerFactory { * @param clientId a client id to identify the creator of this consumer * @param tableName the table name for the topic of this consumer * @param schema the pinot schema of the event being consumed - * @param instanceZKMetadata the instance metadata - * @param serverMetrics metrics object to emit consumption related metrics + * @param groupId consumer group Id * @return */ public abstract StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Schema schema, - InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics); + String groupId); /** * Creates a metadata provider which provides partition specific metadata diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java index 9ad416a..31983f0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java @@ -18,9 +18,6 @@ */ package org.apache.pinot.core.realtime.stream; -import org.apache.pinot.common.metadata.RowMetadata; - - /** * A class that provides metadata associated with the message of a stream, for e.g., * ingestion-timestamp of the message. diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java index a6963db..678b3a6 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java @@ -49,7 +49,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory { @Override public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Schema schema, - InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) { + String groupId) { return new FakeStreamLevelConsumer(); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java index a0de3bf..4be8c95 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java @@ -55,13 +55,13 @@ public class FlakyConsumerRealtimeClusterIntegrationTest extends RealtimeCluster private Random _random = new Random(); public FlakyStreamLevelConsumer(String clientId, String tableName, StreamConfig streamConfig, Schema schema, - InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) { + String groupId) { try { final Constructor constructor = Class.forName(KafkaStarterUtils.KAFKA_STREAM_LEVEL_CONSUMER_CLASS_NAME) .getConstructor(String.class, String.class, StreamConfig.class, Schema.class, InstanceZKMetadata.class, ServerMetrics.class); _streamLevelConsumer = (StreamLevelConsumer) constructor - .newInstance(clientId, tableName, streamConfig, schema, instanceZKMetadata, serverMetrics); + .newInstance(clientId, tableName, streamConfig, schema, groupId); } catch (Exception e) { throw new RuntimeException(e); } @@ -114,9 +114,8 @@ public class FlakyConsumerRealtimeClusterIntegrationTest extends RealtimeCluster @Override public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Schema schema, - InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) { - return new FlakyStreamLevelConsumer(clientId, tableName, _streamConfig, schema, instanceZKMetadata, - serverMetrics); + String groupId) { + return new FlakyStreamLevelConsumer(clientId, tableName, _streamConfig, schema, groupId); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
