This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 652a3e68462fe007b59a3505c44bba3f0f190c3b Author: Neha Pawar <[email protected]> AuthorDate: Tue Dec 29 18:43:48 2020 -0800 StreamPartitionOffset to implement Checkpoint --- .../common/metadata/segment/RealtimeSegmentZKMetadata.java | 6 ++++++ .../org/apache/pinot/common/utils/CommonConstants.java | 4 ++++ .../impl/fakestream/FakeStreamConsumerFactory.java | 14 ++++++++++++++ .../tests/FlakyConsumerRealtimeClusterIntegrationTest.java | 14 ++++++++++++++ .../pinot/plugin/stream/kafka09/KafkaConsumerFactory.java | 14 ++++++++++++++ .../pinot/plugin/stream/kafka20/KafkaConsumerFactory.java | 14 ++++++++++++++ .../main/java/org/apache/pinot/spi/stream/Checkpoint.java | 6 +++--- .../java/org/apache/pinot/spi/stream/LongMsgOffset.java | 10 ++++++++++ .../apache/pinot/spi/stream/PartitionGroupConsumer.java | 5 ++++- .../apache/pinot/spi/stream/PartitionGroupMetadata.java | 2 -- .../apache/pinot/spi/stream/StreamPartitionMsgOffset.java | 2 +- 11 files changed, 84 insertions(+), 7 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java index d88be18..c46af53 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java @@ -35,6 +35,7 @@ public class RealtimeSegmentZKMetadata extends SegmentZKMetadata { private Status _status = null; private int _sizeThresholdToFlushSegment = -1; private String _timeThresholdToFlushSegment = null; // store as period string for readability + private String _partitionGroupMetadataStr = null; public RealtimeSegmentZKMetadata() { setSegmentType(SegmentType.REALTIME); @@ -49,6 +50,7 @@ public class RealtimeSegmentZKMetadata extends SegmentZKMetadata { if (flushThresholdTime != null && !flushThresholdTime.equals(NULL)) { _timeThresholdToFlushSegment = znRecord.getSimpleField(CommonConstants.Segment.FLUSH_THRESHOLD_TIME); } + _partitionGroupMetadataStr = znRecord.getSimpleField(CommonConstants.Segment.PARTITION_GROUP_METADATA); } @Override @@ -141,4 +143,8 @@ public class RealtimeSegmentZKMetadata extends SegmentZKMetadata { public void setTimeThresholdToFlushSegment(String timeThresholdPeriodString) { _timeThresholdToFlushSegment = timeThresholdPeriodString; } + + public String getPartitionGroupMetadataStr() { + return _partitionGroupMetadataStr; + } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java index 9773e7e..7a91d8c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java @@ -379,6 +379,10 @@ public class CommonConstants { public static final String FLUSH_THRESHOLD_TIME = "segment.flush.threshold.time"; public static final String PARTITION_METADATA = "segment.partition.metadata"; /** + * Serialized {@link org.apache.pinot.spi.stream.PartitionGroupMetadata} for this segment + */ + public static final String PARTITION_GROUP_METADATA = "segment.partition.group.metadata"; + /** * This field is used for parallel push protection to lock the segment globally. * We put the segment upload start timestamp so that if the previous push failed without unlock the segment, the * next upload won't be blocked forever. diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java index bb01e5c..9669223 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.realtime.impl.fakestream; +import java.util.List; import java.util.Set; import org.apache.pinot.core.util.IngestionUtils; import org.apache.pinot.spi.config.table.TableConfig; @@ -26,6 +27,8 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionLevelConsumer; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConsumerFactory; @@ -66,6 +69,17 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory { return new FakeStreamMetadataProvider(_streamConfig); } + @Override + public List<PartitionGroupMetadata> getPartitionGroupMetadataList( + List<PartitionGroupMetadata> currentPartitionGroupsMetadata) { + return null; + } + + @Override + public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) { + return null; + } + public static void main(String[] args) throws Exception { String clientId = "client_id_localhost_tester"; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java index b05244f..808a464 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java @@ -19,9 +19,12 @@ package org.apache.pinot.integration.tests; import java.lang.reflect.Constructor; +import java.util.List; import java.util.Random; import java.util.Set; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionLevelConsumer; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConsumerFactory; @@ -117,5 +120,16 @@ public class FlakyConsumerRealtimeClusterIntegrationTest extends RealtimeCluster public StreamMetadataProvider createStreamMetadataProvider(String clientId) { throw new UnsupportedOperationException(); } + + @Override + public List<PartitionGroupMetadata> getPartitionGroupMetadataList( + List<PartitionGroupMetadata> currentPartitionGroupsMetadata) { + return null; + } + + @Override + public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) { + return null; + } } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java index 615e354..b8ed19d 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java @@ -18,7 +18,10 @@ */ package org.apache.pinot.plugin.stream.kafka09; +import java.util.List; import java.util.Set; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionLevelConsumer; import org.apache.pinot.spi.stream.StreamConsumerFactory; import org.apache.pinot.spi.stream.StreamLevelConsumer; @@ -50,4 +53,15 @@ public class KafkaConsumerFactory extends StreamConsumerFactory { public StreamMetadataProvider createStreamMetadataProvider(String clientId) { return new KafkaStreamMetadataProvider(clientId, _streamConfig); } + + @Override + public List<PartitionGroupMetadata> getPartitionGroupMetadataList( + List<PartitionGroupMetadata> currentPartitionGroupsMetadata) { + return null; + } + + @Override + public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) { + return null; + } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java index e0d1015..806baff 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java @@ -18,7 +18,10 @@ */ package org.apache.pinot.plugin.stream.kafka20; +import java.util.List; import java.util.Set; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionLevelConsumer; import org.apache.pinot.spi.stream.StreamConsumerFactory; import org.apache.pinot.spi.stream.StreamLevelConsumer; @@ -47,4 +50,15 @@ public class KafkaConsumerFactory extends StreamConsumerFactory { public StreamMetadataProvider createStreamMetadataProvider(String clientId) { return new KafkaStreamMetadataProvider(clientId, _streamConfig); } + + @Override + public List<PartitionGroupMetadata> getPartitionGroupMetadataList( + List<PartitionGroupMetadata> currentPartitionGroupsMetadata) { + return null; + } + + @Override + public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) { + return null; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java index 627c964..bae8832 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java @@ -18,7 +18,7 @@ */ package org.apache.pinot.spi.stream; -public interface Checkpoint { - byte[] serialize(); - Checkpoint deserialize(byte[] blob); +public interface Checkpoint extends Comparable { + String serialize(); + Checkpoint deserialize(String checkpointStr); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java index e5025f6..e8fa275 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java @@ -50,4 +50,14 @@ public class LongMsgOffset implements StreamPartitionMsgOffset { public String toString() { return Long.toString(_offset); } + + @Override + public String serialize() { + return toString(); + } + + @Override + public Checkpoint deserialize(String checkpointStr) { + return new LongMsgOffset(checkpointStr); + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java index 2f138c2..e096e67 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java @@ -18,6 +18,9 @@ */ package org.apache.pinot.spi.stream; -public interface PartitionGroupConsumer { +import java.io.Closeable; + + +public interface PartitionGroupConsumer extends Closeable { FetchResult fetch(Checkpoint start, Checkpoint end, long timeout); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java index 779c167..0f44173 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java @@ -25,8 +25,6 @@ public interface PartitionGroupMetadata { int getGroupId(); - List<String> getPartitions(); - Checkpoint getStartCheckpoint(); // similar to getStartOffset Checkpoint getEndCheckpoint(); // similar to getEndOffset diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java index 72654bf..06a090e 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java @@ -39,7 +39,7 @@ import org.apache.pinot.spi.annotations.InterfaceStability; * versions of the stream implementation */ @InterfaceStability.Evolving -public interface StreamPartitionMsgOffset extends Comparable { +public interface StreamPartitionMsgOffset extends Checkpoint { /** * Compare this offset with another one. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
