This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8e84fc3fc5 Optimize segment commit to not read partition group
metadata (#11943)
8e84fc3fc5 is described below
commit 8e84fc3fc50deeb3a05b24c09732c4116ad3979d
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Nov 20 11:07:25 2023 -0800
Optimize segment commit to not read partition group metadata (#11943)
---
.../broker/broker/FakeStreamConsumerFactory.java | 6 ++
.../realtime/PinotLLCRealtimeSegmentManager.java | 117 +++++++++++++--------
.../segment/DefaultFlushThresholdUpdater.java | 4 +-
.../realtime/segment/FlushThresholdUpdater.java | 4 +-
.../segment/SegmentFlushThresholdComputer.java | 29 ++---
.../SegmentSizeBasedFlushThresholdUpdater.java | 14 +--
.../PinotLLCRealtimeSegmentManagerTest.java | 9 ++
.../segment/FlushThresholdUpdaterTest.java | 108 ++++---------------
.../segment/SegmentFlushThresholdComputerTest.java | 46 +++-----
.../fakestream/FakeStreamMetadataProvider.java | 8 ++
.../kafka20/KafkaStreamMetadataProvider.java | 19 ++++
.../pulsar/PulsarStreamMetadataProvider.java | 19 +++-
.../pinot/spi/stream/StreamMetadataProvider.java | 11 +-
13 files changed, 186 insertions(+), 208 deletions(-)
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
index 3ada727a13..b0fee61322 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
@@ -21,6 +21,7 @@ package org.apache.pinot.broker.broker;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
@@ -77,6 +78,11 @@ public class FakeStreamConsumerFactory extends
StreamConsumerFactory {
return 1;
}
+ @Override
+ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
+ return Collections.singleton(0);
+ }
+
@Override
public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria
offsetCriteria, long timeoutMillis)
throws TimeoutException {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 8611c251aa..90cd05d16b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -92,7 +92,9 @@ import
org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -322,8 +324,7 @@ public class PinotLLCRealtimeSegmentManager {
for (PartitionGroupMetadata partitionGroupMetadata :
newPartitionGroupMetadataList) {
String segmentName =
setupNewPartitionGroup(tableConfig, streamConfig,
partitionGroupMetadata, currentTimeMs, instancePartitions,
- numPartitionGroups, numReplicas, newPartitionGroupMetadataList);
-
+ numPartitionGroups, numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null,
segmentName, segmentAssignment,
instancePartitionsMap);
}
@@ -500,6 +501,10 @@ public class PinotLLCRealtimeSegmentManager {
LLCSegmentName committingLLCSegment = new
LLCSegmentName(committingSegmentName);
int committingSegmentPartitionGroupId =
committingLLCSegment.getPartitionGroupId();
LOGGER.info("Committing segment metadata for segment: {}",
committingSegmentName);
+ if (StringUtils.isBlank(committingSegmentDescriptor.getSegmentLocation()))
{
+ LOGGER.warn("Committing segment: {} was not uploaded to deep store",
committingSegmentName);
+ _controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.SEGMENT_MISSING_DEEP_STORE_LINK, 1);
+ }
TableConfig tableConfig = getTableConfig(realtimeTableName);
InstancePartitions instancePartitions =
getConsumingInstancePartitions(tableConfig);
@@ -518,40 +523,47 @@ public class PinotLLCRealtimeSegmentManager {
*/
// Step-1
+ long startTimeNs1 = System.nanoTime();
SegmentZKMetadata committingSegmentZKMetadata =
updateCommittingSegmentZKMetadata(realtimeTableName,
committingSegmentDescriptor);
// Refresh the Broker routing to reflect the changes in the segment ZK
metadata
_helixResourceManager.sendSegmentRefreshMessage(realtimeTableName,
committingSegmentName, false, true);
- // Using the latest segment of each partition group, creates a list of
{@link PartitionGroupConsumptionStatus}
- StreamConfig streamConfig =
- new StreamConfig(tableConfig.getTableName(),
IngestionConfigUtils.getStreamConfigMap(tableConfig));
- List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
- getPartitionGroupConsumptionStatusList(idealState, streamConfig);
-
- // Fetches new partition groups, given current list of {@link
PartitionGroupConsumptionStatus}.
- List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- getNewPartitionGroupMetadataList(streamConfig,
currentPartitionGroupConsumptionStatusList);
- Set<Integer> newPartitionGroupSet =
-
newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
- .collect(Collectors.toSet());
- int numPartitionGroups = newPartitionGroupMetadataList.size();
-
+ // Step-2
+ long startTimeNs2 = System.nanoTime();
String newConsumingSegmentName = null;
- if (!isTablePaused(idealState) &&
newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
- // Only if committingSegment's partitionGroup is present in the
newPartitionGroupMetadataList, we create new
- // segment metadata
- String rawTableName =
TableNameBuilder.extractRawTableName(realtimeTableName);
- long newSegmentCreationTimeMs = getCurrentTimeMs();
- LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName,
committingSegmentPartitionGroupId,
- committingLLCSegment.getSequenceNumber() + 1,
newSegmentCreationTimeMs);
- createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment,
newSegmentCreationTimeMs,
- committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, numPartitionGroups, numReplicas,
- newPartitionGroupMetadataList);
- newConsumingSegmentName = newLLCSegment.getSegmentName();
+ if (!isTablePaused(idealState)) {
+ StreamConfig streamConfig =
+ new StreamConfig(tableConfig.getTableName(),
IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ Set<Integer> partitionIds;
+ try {
+ partitionIds = getPartitionIds(streamConfig);
+ } catch (Exception e) {
+ LOGGER.info("Failed to fetch partition ids from stream metadata
provider for table: {}, exception: {}. "
+ + "Reading all partition group metadata to determine partition
ids.", realtimeTableName, e.toString());
+ // TODO: Find a better way to determine partition count and if the
committing partition group is fully consumed.
+ // We don't need to read partition group metadata for other
partition groups.
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
+ getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+ getNewPartitionGroupMetadataList(streamConfig,
currentPartitionGroupConsumptionStatusList);
+ partitionIds =
newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
+ .collect(Collectors.toSet());
+ }
+ if (partitionIds.contains(committingSegmentPartitionGroupId)) {
+ String rawTableName =
TableNameBuilder.extractRawTableName(realtimeTableName);
+ long newSegmentCreationTimeMs = getCurrentTimeMs();
+ LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName,
committingSegmentPartitionGroupId,
+ committingLLCSegment.getSequenceNumber() + 1,
newSegmentCreationTimeMs);
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment,
newSegmentCreationTimeMs,
+ committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, partitionIds.size(),
+ numReplicas);
+ newConsumingSegmentName = newLLCSegment.getSegmentName();
+ }
}
// Step-3
+ long startTimeNs3 = System.nanoTime();
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixManager,
tableConfig, _controllerMetrics);
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
@@ -572,6 +584,15 @@ public class PinotLLCRealtimeSegmentManager {
lock.unlock();
}
+ long endTimeNs = System.nanoTime();
+ LOGGER.info(
+ "Finished committing segment metadata for segment: {}. Time taken for
updating committing segment metadata: "
+ + "{}ms; creating new consuming segment ({}) metadata: {}ms;
updating ideal state: {}ms; total: {}ms",
+ committingSegmentName, TimeUnit.NANOSECONDS.toMillis(startTimeNs2 -
startTimeNs1), newConsumingSegmentName,
+ TimeUnit.NANOSECONDS.toMillis(startTimeNs3 - startTimeNs2),
+ TimeUnit.NANOSECONDS.toMillis(endTimeNs - startTimeNs3),
+ TimeUnit.NANOSECONDS.toMillis(endTimeNs - startTimeNs1));
+
// TODO: also create the new partition groups here, instead of waiting
till the {@link
// RealtimeSegmentValidationManager} runs
// E.g. If current state is A, B, C, and newPartitionGroupMetadataList
contains B, C, D, E,
@@ -581,10 +602,6 @@ public class PinotLLCRealtimeSegmentManager {
// Trigger the metadata event notifier
_metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig);
-
- if (StringUtils.isBlank(committingSegmentDescriptor.getSegmentLocation()))
{
- _controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.SEGMENT_MISSING_DEEP_STORE_LINK, 1);
- }
}
/**
@@ -649,8 +666,8 @@ public class PinotLLCRealtimeSegmentManager {
*/
private void createNewSegmentZKMetadata(TableConfig tableConfig,
StreamConfig streamConfig,
LLCSegmentName newLLCSegmentName, long creationTimeMs,
CommittingSegmentDescriptor committingSegmentDescriptor,
- @Nullable SegmentZKMetadata committingSegmentZKMetadata,
InstancePartitions instancePartitions,
- int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata>
partitionGroupMetadataList) {
+ @Nullable SegmentZKMetadata committingSegmentZKMetadata,
InstancePartitions instancePartitions, int numPartitions,
+ int numReplicas) {
String realtimeTableName = tableConfig.getTableName();
String segmentName = newLLCSegmentName.getSegmentName();
String startOffset = committingSegmentDescriptor.getNextOffset();
@@ -668,7 +685,7 @@ public class PinotLLCRealtimeSegmentManager {
// Add the partition metadata if available
SegmentPartitionMetadata partitionMetadata =
- getPartitionMetadataFromTableConfig(tableConfig,
newLLCSegmentName.getPartitionGroupId(), numPartitionGroups);
+ getPartitionMetadataFromTableConfig(tableConfig,
newLLCSegmentName.getPartitionGroupId(), numPartitions);
if (partitionMetadata != null) {
newSegmentZKMetadata.setPartitionMetadata(partitionMetadata);
}
@@ -676,9 +693,7 @@ public class PinotLLCRealtimeSegmentManager {
// Update the flush threshold
FlushThresholdUpdater flushThresholdUpdater =
_flushThresholdUpdateManager.getFlushThresholdUpdater(streamConfig);
flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata,
- getMaxNumPartitionsPerInstance(instancePartitions, numPartitionGroups,
numReplicas),
- partitionGroupMetadataList);
+ committingSegmentZKMetadata,
getMaxNumPartitionsPerInstance(instancePartitions, numPartitions, numReplicas));
persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
}
@@ -747,6 +762,22 @@ public class PinotLLCRealtimeSegmentManager {
return commitTimeoutMS;
}
+ /**
+ * Fetches the partition ids for the stream. Some stream (e.g. Kinesis)
might not support this operation, in which
+ * case exception will be thrown.
+ */
+ @VisibleForTesting
+ Set<Integer> getPartitionIds(StreamConfig streamConfig)
+ throws Exception {
+ String clientId =
+ PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" +
streamConfig.getTableNameWithType() + "-"
+ + streamConfig.getTopicName();
+ StreamConsumerFactory consumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ try (StreamMetadataProvider metadataProvider =
consumerFactory.createStreamMetadataProvider(clientId)) {
+ return metadataProvider.fetchPartitionIds(5000L);
+ }
+ }
+
/**
* Fetches the latest state of the PartitionGroups for the stream
* If any partition has reached end of life, and all messages of that
partition have been consumed by the segment,
@@ -1122,8 +1153,7 @@ public class PinotLLCRealtimeSegmentManager {
new CommittingSegmentDescriptor(latestSegmentName,
(offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
createNewSegmentZKMetadata(tableConfig, streamConfig,
newLLCSegmentName, currentTimeMs,
- committingSegmentDescriptor, latestSegmentZKMetadata,
instancePartitions, numPartitions, numReplicas,
- newPartitionGroupMetadataList);
+ committingSegmentDescriptor, latestSegmentZKMetadata,
instancePartitions, numPartitions, numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap,
latestSegmentName, newSegmentName,
segmentAssignment, instancePartitionsMap);
} else { // partition group reached end of life
@@ -1227,7 +1257,7 @@ public class PinotLLCRealtimeSegmentManager {
if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
String newSegmentName =
setupNewPartitionGroup(tableConfig, streamConfig,
partitionGroupMetadata, currentTimeMs, instancePartitions,
- numPartitions, numReplicas, newPartitionGroupMetadataList);
+ numPartitions, numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null,
newSegmentName, segmentAssignment,
instancePartitionsMap);
}
@@ -1248,7 +1278,7 @@ public class PinotLLCRealtimeSegmentManager {
CommittingSegmentDescriptor committingSegmentDescriptor =
new
CommittingSegmentDescriptor(latestSegmentZKMetadata.getSegmentName(),
startOffset.toString(), 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName,
currentTimeMs, committingSegmentDescriptor,
- latestSegmentZKMetadata, instancePartitions, numPartitions,
numReplicas, newPartitionGroupMetadataList);
+ latestSegmentZKMetadata, instancePartitions, numPartitions,
numReplicas);
String newSegmentName = newLLCSegmentName.getSegmentName();
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null,
newSegmentName, segmentAssignment,
instancePartitionsMap);
@@ -1300,7 +1330,7 @@ public class PinotLLCRealtimeSegmentManager {
*/
private String setupNewPartitionGroup(TableConfig tableConfig, StreamConfig
streamConfig,
PartitionGroupMetadata partitionGroupMetadata, long creationTimeMs,
InstancePartitions instancePartitions,
- int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata>
partitionGroupMetadataList) {
+ int numPartitions, int numReplicas) {
String realtimeTableName = tableConfig.getTableName();
int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
String startOffset = partitionGroupMetadata.getStartOffset().toString();
@@ -1313,8 +1343,7 @@ public class PinotLLCRealtimeSegmentManager {
CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(null, startOffset, 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName,
creationTimeMs,
- committingSegmentDescriptor, null, instancePartitions,
numPartitionGroups, numReplicas,
- partitionGroupMetadataList);
+ committingSegmentDescriptor, null, instancePartitions, numPartitions,
numReplicas);
return newSegmentName;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
index e1dc7f9a80..57a4a11743 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
@@ -19,10 +19,8 @@
package org.apache.pinot.controller.helix.core.realtime.segment;
import com.google.common.annotations.VisibleForTesting;
-import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
@@ -40,7 +38,7 @@ public class DefaultFlushThresholdUpdater implements
FlushThresholdUpdater {
@Override
public void updateFlushThreshold(StreamConfig streamConfig,
SegmentZKMetadata newSegmentZKMetadata,
CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable
SegmentZKMetadata committingSegmentZKMetadata,
- int maxNumPartitionsPerInstance, List<PartitionGroupMetadata>
partitionGroupMetadataList) {
+ int maxNumPartitionsPerInstance) {
// Configure the segment size flush limit based on the maximum number of
partitions allocated to an instance
newSegmentZKMetadata.setSizeThresholdToFlushSegment(_tableFlushSize /
maxNumPartitionsPerInstance);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
index 5022ca398a..e322a367e0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
@@ -18,10 +18,8 @@
*/
package org.apache.pinot.controller.helix.core.realtime.segment;
-import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
@@ -36,5 +34,5 @@ public interface FlushThresholdUpdater {
*/
void updateFlushThreshold(StreamConfig streamConfig, SegmentZKMetadata
newSegmentZKMetadata,
CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable
SegmentZKMetadata committingSegmentZKMetadata,
- int maxNumPartitionsPerInstance, List<PartitionGroupMetadata>
partitionGroupMetadataList);
+ int maxNumPartitionsPerInstance);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
index aba18df887..808642a345 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
@@ -20,11 +20,8 @@ package
org.apache.pinot.controller.helix.core.realtime.segment;
import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
-import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.TimeUtils;
@@ -58,8 +55,7 @@ class SegmentFlushThresholdComputer {
}
public int computeThreshold(StreamConfig streamConfig,
CommittingSegmentDescriptor committingSegmentDescriptor,
- @Nullable SegmentZKMetadata committingSegmentZKMetadata,
List<PartitionGroupMetadata> partitionGroupMetadataList,
- String newSegmentName) {
+ @Nullable SegmentZKMetadata committingSegmentZKMetadata, String
newSegmentName) {
final long desiredSegmentSizeBytes =
streamConfig.getFlushThresholdSegmentSizeBytes();
final long optimalSegmentSizeBytesMin = desiredSegmentSizeBytes / 2;
final double optimalSegmentSizeBytesMax = desiredSegmentSizeBytes * 1.5;
@@ -99,24 +95,11 @@ class SegmentFlushThresholdComputer {
committingSegmentSizeBytes);
double currentRatio = (double) numRowsConsumed /
committingSegmentSizeBytes;
- // Compute segment size to rows ratio only from the lowest available
partition id.
- // If we consider all partitions then it is likely that we will assign a
much higher weight to the most
- // recent trend in the table (since it is usually true that all partitions
of the same table follow more or
- // less same characteristics at any one point in time).
- // However, when we start a new table or change controller mastership, we
can have any partition completing first.
- // It is best to learn the ratio as quickly as we can, so we allow any
partition to supply the value.
- int smallestAvailablePartitionGroupId =
-
partitionGroupMetadataList.stream().mapToInt(PartitionGroupMetadata::getPartitionGroupId).min().orElse(0);
-
- if (new LLCSegmentName(newSegmentName).getPartitionGroupId() ==
smallestAvailablePartitionGroupId
- || _latestSegmentRowsToSizeRatio == 0) {
- if (_latestSegmentRowsToSizeRatio > 0) {
- _latestSegmentRowsToSizeRatio =
- CURRENT_SEGMENT_RATIO_WEIGHT * currentRatio
- + PREVIOUS_SEGMENT_RATIO_WEIGHT *
_latestSegmentRowsToSizeRatio;
- } else {
- _latestSegmentRowsToSizeRatio = currentRatio;
- }
+ if (_latestSegmentRowsToSizeRatio > 0) {
+ _latestSegmentRowsToSizeRatio =
+ CURRENT_SEGMENT_RATIO_WEIGHT * currentRatio +
PREVIOUS_SEGMENT_RATIO_WEIGHT * _latestSegmentRowsToSizeRatio;
+ } else {
+ _latestSegmentRowsToSizeRatio = currentRatio;
}
// If the number of rows consumed is less than what we set as target in
metadata, then the segment hit time limit.
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
index 439279afbd..511e85651b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
@@ -18,11 +18,8 @@
*/
package org.apache.pinot.controller.helix.core.realtime.segment;
-import com.google.common.annotations.VisibleForTesting;
-import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,18 +30,13 @@ import org.slf4j.LoggerFactory;
* previous segment
* The formula used to compute new number of rows is:
* targetNumRows = ideal_segment_size * (a * current_rows_to_size_ratio + b *
previous_rows_to_size_ratio)
- * where a = 0.25, b = 0.75, prev ratio= ratio collected over all previous
segment completions
+ * where a = 0.1, b = 0.9, prev ratio= ratio collected over all previous
segment completions
* This ensures that we take into account the history of the segment size and
number rows
*/
public class SegmentSizeBasedFlushThresholdUpdater implements
FlushThresholdUpdater {
public static final Logger LOGGER =
LoggerFactory.getLogger(SegmentSizeBasedFlushThresholdUpdater.class);
private final SegmentFlushThresholdComputer _flushThresholdComputer;
- @VisibleForTesting
- double getLatestSegmentRowsToSizeRatio() {
- return _flushThresholdComputer.getLatestSegmentRowsToSizeRatio();
- }
-
public SegmentSizeBasedFlushThresholdUpdater() {
_flushThresholdComputer = new SegmentFlushThresholdComputer();
}
@@ -53,10 +45,10 @@ public class SegmentSizeBasedFlushThresholdUpdater
implements FlushThresholdUpda
@Override
public synchronized void updateFlushThreshold(StreamConfig streamConfig,
SegmentZKMetadata newSegmentZKMetadata,
CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable
SegmentZKMetadata committingSegmentZKMetadata,
- int maxNumPartitionsPerInstance, List<PartitionGroupMetadata>
partitionGroupMetadataList) {
+ int maxNumPartitionsPerInstance) {
int threshold =
_flushThresholdComputer.computeThreshold(streamConfig,
committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, newSegmentZKMetadata.getSegmentName());
+ newSegmentZKMetadata.getSegmentName());
newSegmentZKMetadata.setSizeThresholdToFlushSegment(threshold);
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index ffc98692a0..92d7ec19b4 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -31,6 +31,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -1220,6 +1221,14 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentAssignment, instancePartitionsMap);
}
+ @Override
+ Set<Integer> getPartitionIds(StreamConfig streamConfig) {
+ if (_partitionGroupMetadataList != null) {
+ throw new UnsupportedOperationException();
+ }
+ return IntStream.range(0,
_numPartitions).boxed().collect(Collectors.toSet());
+ }
+
@Override
List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig
streamConfig,
List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList) {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
index fbdaec5d3e..1184f113d2 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
@@ -18,14 +18,10 @@
*/
package org.apache.pinot.controller.helix.core.realtime.segment;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.stream.LongMsgOffset;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.annotations.Test;
@@ -134,9 +130,8 @@ public class FlushThresholdUpdaterTest {
// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
CommittingSegmentDescriptor committingSegmentDescriptor =
getCommittingSegmentDescriptor(0L);
- flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadata,
committingSegmentDescriptor, null, 1,
- Collections.emptyList());
+ flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor, null,
+ 1);
assertEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(),
streamConfig.getFlushAutotuneInitialRows());
int numRuns = 500;
@@ -148,7 +143,7 @@ public class FlushThresholdUpdaterTest {
SegmentZKMetadata committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(System.currentTimeMillis(),
numRowsConsumed, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
+ committingSegmentZKMetadata, 1);
// Assert that segment size is in limits
if (run > checkRunsAfter) {
@@ -172,9 +167,8 @@ public class FlushThresholdUpdaterTest {
// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(1);
CommittingSegmentDescriptor committingSegmentDescriptor =
getCommittingSegmentDescriptor(0L);
- flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadata,
committingSegmentDescriptor, null, 1,
- getPartitionGroupMetadataList(3, 1));
+ flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor, null,
+ 1);
assertEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(),
streamConfig.getFlushAutotuneInitialRows());
int numRuns = 500;
@@ -186,7 +180,7 @@ public class FlushThresholdUpdaterTest {
SegmentZKMetadata committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(System.currentTimeMillis(),
numRowsConsumed, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, getPartitionGroupMetadataList(3,
1));
+ committingSegmentZKMetadata, 1);
// Assert that segment size is in limits
if (run > checkRunsAfter) {
@@ -201,16 +195,6 @@ public class FlushThresholdUpdaterTest {
new LLCSegmentName(RAW_TABLE_NAME, partitionId, 0,
System.currentTimeMillis()).getSegmentName());
}
- private List<PartitionGroupMetadata> getPartitionGroupMetadataList(int
numPartitions, int startPartitionId) {
- List<PartitionGroupMetadata> newPartitionGroupMetadataList = new
ArrayList<>();
-
- for (int i = 0; i < numPartitions; i++) {
- newPartitionGroupMetadataList.add(new
PartitionGroupMetadata(startPartitionId + i, null));
- }
-
- return newPartitionGroupMetadataList;
- }
-
private CommittingSegmentDescriptor getCommittingSegmentDescriptor(long
segmentSizeBytes) {
return new CommittingSegmentDescriptor(null, new
LongMsgOffset(0).toString(), segmentSizeBytes);
}
@@ -244,8 +228,8 @@ public class FlushThresholdUpdaterTest {
// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
CommittingSegmentDescriptor committingSegmentDescriptor =
getCommittingSegmentDescriptor(0L);
- flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
- Collections.emptyList());
+ flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor, null,
+ 1);
int sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
// First segment consumes rows less than the threshold
@@ -254,7 +238,7 @@ public class FlushThresholdUpdaterTest {
SegmentZKMetadata committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(System.currentTimeMillis(),
sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
+ committingSegmentZKMetadata, 1);
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertEquals(sizeThreshold,
(int) (numRowsConsumed *
SegmentFlushThresholdComputer.ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT));
@@ -264,7 +248,7 @@ public class FlushThresholdUpdaterTest {
committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(System.currentTimeMillis(),
sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
+ committingSegmentZKMetadata, 1);
assertNotEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(),
(int) (numRowsConsumed *
SegmentFlushThresholdComputer.ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT));
}
@@ -277,8 +261,8 @@ public class FlushThresholdUpdaterTest {
// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
CommittingSegmentDescriptor committingSegmentDescriptor =
getCommittingSegmentDescriptor(0L);
- flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
- Collections.emptyList());
+ flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor, null,
+ 1);
int sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
// First segment only consumed 15 rows, so next segment should have size
threshold of 10_000
@@ -287,7 +271,7 @@ public class FlushThresholdUpdaterTest {
SegmentZKMetadata committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(System.currentTimeMillis(),
sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
+ committingSegmentZKMetadata, 1);
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertEquals(sizeThreshold,
SegmentFlushThresholdComputer.MINIMUM_NUM_ROWS_THRESHOLD);
@@ -296,63 +280,11 @@ public class FlushThresholdUpdaterTest {
committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(System.currentTimeMillis(),
sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
+ committingSegmentZKMetadata, 1);
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertEquals(sizeThreshold,
SegmentFlushThresholdComputer.MINIMUM_NUM_ROWS_THRESHOLD);
}
- @Test
- public void testNonZeroPartitionUpdates() {
- SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new
SegmentSizeBasedFlushThresholdUpdater();
- StreamConfig streamConfig = mockDefaultAutotuneStreamConfig();
-
- // Start consumption for 2 partitions
- SegmentZKMetadata newSegmentZKMetadataForPartition0 =
getNewSegmentZKMetadata(0);
- SegmentZKMetadata newSegmentZKMetadataForPartition1 =
getNewSegmentZKMetadata(1);
- CommittingSegmentDescriptor committingSegmentDescriptor =
getCommittingSegmentDescriptor(0L);
- flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition0,
committingSegmentDescriptor, null, 1,
- Collections.emptyList());
- flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1,
committingSegmentDescriptor, null, 1,
- Collections.emptyList());
- int sizeThresholdForPartition0 =
newSegmentZKMetadataForPartition0.getSizeThresholdToFlushSegment();
- int sizeThresholdForPartition1 =
newSegmentZKMetadataForPartition1.getSizeThresholdToFlushSegment();
- double sizeRatio = flushThresholdUpdater.getLatestSegmentRowsToSizeRatio();
- assertEquals(sizeThresholdForPartition0,
streamConfig.getFlushAutotuneInitialRows());
- assertEquals(sizeThresholdForPartition1,
streamConfig.getFlushAutotuneInitialRows());
- assertEquals(sizeRatio, 0.0);
-
- // First segment from partition 1 should change the size ratio
- committingSegmentDescriptor = getCommittingSegmentDescriptor(128_000_000L);
- SegmentZKMetadata committingSegmentZKMetadata =
- getCommittingSegmentZKMetadata(System.currentTimeMillis(),
sizeThresholdForPartition1,
- sizeThresholdForPartition1);
- flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1,
committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
- sizeThresholdForPartition1 =
newSegmentZKMetadataForPartition1.getSizeThresholdToFlushSegment();
- sizeRatio = flushThresholdUpdater.getLatestSegmentRowsToSizeRatio();
- assertTrue(sizeRatio > 0.0);
-
- // Second segment update from partition 1 should not change the size ratio
- committingSegmentDescriptor = getCommittingSegmentDescriptor(256_000_000L);
- committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(System.currentTimeMillis(),
sizeThresholdForPartition1,
- sizeThresholdForPartition1);
- flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1,
committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
- assertEquals(flushThresholdUpdater.getLatestSegmentRowsToSizeRatio(),
sizeRatio);
-
- // First segment update from partition 0 should change the size ratio
- committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(System.currentTimeMillis(),
sizeThresholdForPartition0,
- sizeThresholdForPartition0);
- flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition0,
committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
- assertNotEquals(flushThresholdUpdater.getLatestSegmentRowsToSizeRatio(),
sizeRatio);
- }
-
@Test
public void testSegmentSizeBasedUpdaterWithModifications() {
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new
SegmentSizeBasedFlushThresholdUpdater();
@@ -367,8 +299,8 @@ public class FlushThresholdUpdaterTest {
// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
CommittingSegmentDescriptor committingSegmentDescriptor =
getCommittingSegmentDescriptor(0L);
- flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
- Collections.emptyList());
+ flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor, null,
+ 1);
int sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertEquals(sizeThreshold, flushAutotuneInitialRows);
@@ -382,7 +314,7 @@ public class FlushThresholdUpdaterTest {
SegmentZKMetadata committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(creationTime, sizeThreshold,
numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
+ committingSegmentZKMetadata, 1);
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertTrue(sizeThreshold > numRowsConsumed);
@@ -395,7 +327,7 @@ public class FlushThresholdUpdaterTest {
mockAutotuneStreamConfig(flushSegmentDesiredSizeBytes,
flushThresholdTimeMillis, flushAutotuneInitialRows);
committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime,
sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
+ committingSegmentZKMetadata, 1);
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertTrue(sizeThreshold < numRowsConsumed);
@@ -406,7 +338,7 @@ public class FlushThresholdUpdaterTest {
committingSegmentDescriptor =
getCommittingSegmentDescriptor(committingSegmentSize);
committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime,
sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
+ committingSegmentZKMetadata, 1);
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertEquals(sizeThreshold,
(long) (numRowsConsumed *
SegmentFlushThresholdComputer.ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT));
@@ -419,7 +351,7 @@ public class FlushThresholdUpdaterTest {
mockAutotuneStreamConfig(flushSegmentDesiredSizeBytes,
flushThresholdTimeMillis, flushAutotuneInitialRows);
committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime,
sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
+ committingSegmentZKMetadata, 1);
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertTrue(sizeThreshold < numRowsConsumed);
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java
index 820b40a491..9bdc1656a0 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java
@@ -20,11 +20,8 @@ package
org.apache.pinot.controller.helix.core.realtime.segment;
import java.time.Clock;
import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.testng.annotations.Test;
@@ -35,6 +32,7 @@ import static org.testng.Assert.assertEquals;
public class SegmentFlushThresholdComputerTest {
+
@Test
public void testUseAutoTuneInitialRowsIfFirstSegmentInPartition() {
int autoTuneInitialRows = 1_000;
@@ -44,11 +42,8 @@ public class SegmentFlushThresholdComputerTest {
when(streamConfig.getFlushAutotuneInitialRows()).thenReturn(autoTuneInitialRows);
CommittingSegmentDescriptor committingSegmentDescriptor =
mock(CommittingSegmentDescriptor.class);
- SegmentZKMetadata committingSegmentZKMetadata = null;
- List<PartitionGroupMetadata> partitionGroupMetadataList = new
ArrayList<>();
- int threshold = computer.computeThreshold(streamConfig,
committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "newSegmentName");
+ int threshold = computer.computeThreshold(streamConfig,
committingSegmentDescriptor, null, "newSegmentName");
assertEquals(threshold, autoTuneInitialRows);
}
@@ -64,11 +59,8 @@ public class SegmentFlushThresholdComputerTest {
when(streamConfig.getFlushThresholdSegmentSizeBytes()).thenReturn(segmentSizeBytes);
CommittingSegmentDescriptor committingSegmentDescriptor =
mock(CommittingSegmentDescriptor.class);
- SegmentZKMetadata committingSegmentZKMetadata = null;
- List<PartitionGroupMetadata> partitionGroupMetadataList = new
ArrayList<>();
- int threshold = computer.computeThreshold(streamConfig,
committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "newSegmentName");
+ int threshold = computer.computeThreshold(streamConfig,
committingSegmentDescriptor, null, "newSegmentName");
// segmentSize * 1.5
// 20000 * 1.5
@@ -86,11 +78,8 @@ public class SegmentFlushThresholdComputerTest {
when(streamConfig.getFlushThresholdSegmentSizeBytes()).thenReturn(segmentSizeBytes);
CommittingSegmentDescriptor committingSegmentDescriptor =
mock(CommittingSegmentDescriptor.class);
- SegmentZKMetadata committingSegmentZKMetadata = null;
- List<PartitionGroupMetadata> partitionGroupMetadataList = new
ArrayList<>();
- int threshold = computer.computeThreshold(streamConfig,
committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "newSegmentName");
+ int threshold = computer.computeThreshold(streamConfig,
committingSegmentDescriptor, null, "newSegmentName");
assertEquals(threshold, 10000);
}
@@ -110,9 +99,8 @@ public class SegmentFlushThresholdComputerTest {
SegmentZKMetadata committingSegmentZKMetadata =
mock(SegmentZKMetadata.class);
when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(segmentSizeThreshold);
- List<PartitionGroupMetadata> partitionGroupMetadataList = new
ArrayList<>();
int threshold = computer.computeThreshold(streamConfig,
committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "newSegmentName");
+ "newSegmentName");
assertEquals(threshold, segmentSizeThreshold);
}
@@ -137,9 +125,8 @@ public class SegmentFlushThresholdComputerTest {
when(committingSegmentZKMetadata.getCreationTime()).thenReturn(
currentTime - MILLISECONDS.convert(1, TimeUnit.HOURS));
- List<PartitionGroupMetadata> partitionGroupMetadataList = new
ArrayList<>();
int threshold = computer.computeThreshold(streamConfig,
committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+ "events3__0__0__20211222T1646Z");
// totalDocs * 1.1
// 10000 * 1.1
@@ -166,9 +153,8 @@ public class SegmentFlushThresholdComputerTest {
when(committingSegmentZKMetadata.getCreationTime()).thenReturn(
currentTime - MILLISECONDS.convert(2, TimeUnit.HOURS));
- List<PartitionGroupMetadata> partitionGroupMetadataList = new
ArrayList<>();
int threshold = computer.computeThreshold(streamConfig,
committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+ "events3__0__0__20211222T1646Z");
// (totalDocs / 2) * 1.1
// (30000 / 2) * 1.1
@@ -190,9 +176,8 @@ public class SegmentFlushThresholdComputerTest {
when(committingSegmentZKMetadata.getTotalDocs()).thenReturn(30_000L);
when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(20_000);
- List<PartitionGroupMetadata> partitionGroupMetadataList = new
ArrayList<>();
int threshold = computer.computeThreshold(streamConfig,
committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+ "events3__0__0__20211222T1646Z");
// totalDocs / 2
// 30000 / 2
@@ -213,9 +198,8 @@ public class SegmentFlushThresholdComputerTest {
when(committingSegmentZKMetadata.getTotalDocs()).thenReturn(30_000L);
when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(20_000);
- List<PartitionGroupMetadata> partitionGroupMetadataList = new
ArrayList<>();
int threshold = computer.computeThreshold(streamConfig,
committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+ "events3__0__0__20211222T1646Z");
// totalDocs + (totalDocs / 2)
// 30000 + (30000 / 2)
@@ -236,9 +220,8 @@ public class SegmentFlushThresholdComputerTest {
when(committingSegmentZKMetadata.getTotalDocs()).thenReturn(30_000L);
when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(20_000);
- List<PartitionGroupMetadata> partitionGroupMetadataList = new
ArrayList<>();
int threshold = computer.computeThreshold(streamConfig,
committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+ "events3__0__0__20211222T1646Z");
// (totalDocs / segmentSize) * flushThresholdSegmentSize
// (30000 / 250000) * 300000
@@ -259,9 +242,8 @@ public class SegmentFlushThresholdComputerTest {
when(committingSegmentZKMetadata.getTotalDocs()).thenReturn(0L);
when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(0);
- List<PartitionGroupMetadata> partitionGroupMetadataList = new
ArrayList<>();
int threshold = computer.computeThreshold(streamConfig,
committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+ "events3__0__0__20211222T1646Z");
// max((totalDocs / segmentSize) * flushThresholdSegmentSize, 10000)
// max(0, 10000)
@@ -282,17 +264,15 @@ public class SegmentFlushThresholdComputerTest {
when(committingSegmentZKMetadata.getTotalDocs()).thenReturn(30_000L,
50_000L);
when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(60_000);
- List<PartitionGroupMetadata> partitionGroupMetadataList = new
ArrayList<>();
-
computer.computeThreshold(streamConfig, committingSegmentDescriptor,
committingSegmentZKMetadata,
- partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+ "events3__0__0__20211222T1646Z");
// (totalDocs / segmentSize)
// (30000 / 200000)
assertEquals(computer.getLatestSegmentRowsToSizeRatio(), 0.15);
computer.computeThreshold(streamConfig, committingSegmentDescriptor,
committingSegmentZKMetadata,
- partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+ "events3__0__0__20211222T1646Z");
// (0.1 * (totalDocs / segmentSize)) + (0.9 * lastRatio)
// (0.1 * (50000 / 200000)) + (0.9 * 0.15)
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
index 77abf423c9..c59c15a028 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
@@ -19,6 +19,9 @@
package org.apache.pinot.core.realtime.impl.fakestream;
import java.io.IOException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
@@ -40,6 +43,11 @@ public class FakeStreamMetadataProvider implements
StreamMetadataProvider {
return _numPartitions;
}
+ @Override
+ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
+ return IntStream.range(0,
_numPartitions).boxed().collect(Collectors.toSet());
+ }
+
@Override
public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria
offsetCriteria, long timeoutMillis) {
if (offsetCriteria.isSmallest()) {
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index 3e8928da12..1086c45d99 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -19,6 +19,7 @@
package org.apache.pinot.plugin.stream.kafka20;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
@@ -26,6 +27,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
@@ -70,6 +72,23 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
}
}
+ @Override
+ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
+ try {
+ List<PartitionInfo> partitionInfos = _consumer.partitionsFor(_topic,
Duration.ofMillis(timeoutMillis));
+ if (CollectionUtils.isEmpty(partitionInfos)) {
+ throw new RuntimeException(String.format("Failed to fetch partition
information for topic: %s", _topic));
+ }
+ Set<Integer> partitionIds =
Sets.newHashSetWithExpectedSize(partitionInfos.size());
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ partitionIds.add(partitionInfo.partition());
+ }
+ return partitionIds;
+ } catch (TimeoutException e) {
+ throw new TransientConsumerException(e);
+ }
+ }
+
@Override
public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria
offsetCriteria, long timeoutMillis) {
Preconditions.checkNotNull(offsetCriteria);
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
index 788e5e5ed3..1413aa4334 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
@@ -19,9 +19,11 @@
package org.apache.pinot.plugin.stream.pulsar;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -31,6 +33,7 @@ import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.TransientConsumerException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -64,12 +67,24 @@ public class PulsarStreamMetadataProvider extends
PulsarPartitionLevelConnection
@Override
public int fetchPartitionCount(long timeoutMillis) {
try {
- return _pulsarClient.getPartitionsForTopic(_topic).get().size();
+ return _pulsarClient.getPartitionsForTopic(_topic).get(timeoutMillis,
TimeUnit.MILLISECONDS).size();
+ } catch (TimeoutException e) {
+ throw new TransientConsumerException(e);
} catch (Exception e) {
- throw new RuntimeException("Cannot fetch partitions for topic: " +
_topic, e);
+ throw new RuntimeException("Failed to fetch partitions for topic: " +
_topic, e);
}
}
+ @Override
+ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
+ int partitionCount = fetchPartitionCount(timeoutMillis);
+ Set<Integer> partitionIds =
Sets.newHashSetWithExpectedSize(partitionCount);
+ for (int i = 0; i < partitionCount; i++) {
+ partitionIds.add(i);
+ }
+ return partitionIds;
+ }
+
/**
* Fetch the messageId and use it as offset.
* If offset criteria is smallest, the message id of earliest record in the
partition is returned.
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 94abc3ca6d..78847c0627 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
@@ -43,6 +44,13 @@ public interface StreamMetadataProvider extends Closeable {
*/
int fetchPartitionCount(long timeoutMillis);
+ /**
+ * Fetches the partition ids for a topic given the stream configs.
+ */
+ default Set<Integer> fetchPartitionIds(long timeoutMillis) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Fetches the offset for a given partition and offset criteria
* @param offsetCriteria offset criteria to fetch{@link
StreamPartitionMsgOffset}.
@@ -98,5 +106,6 @@ public interface StreamMetadataProvider extends Closeable {
return result;
}
- class UnknownLagState extends PartitionLagState { }
+ class UnknownLagState extends PartitionLagState {
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]