This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch remove_consuming_partition_info in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 200bef6ecd05c177b829b0b6ab1995916c13a7c6 Author: Xiaotian (Jackie) Jiang <[email protected]> AuthorDate: Thu Sep 17 16:12:24 2020 -0700 Remove the partition info from the consuming segment ZK metadata --- .../realtime/PinotLLCRealtimeSegmentManager.java | 41 ++-------------------- ...PartitionLLCRealtimeClusterIntegrationTest.java | 39 ++++++++++---------- 2 files changed, 22 insertions(+), 58 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index d93072a..5d47b1a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -27,7 +27,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -69,8 +68,6 @@ import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpd import org.apache.pinot.controller.util.SegmentCompletionUtils; import org.apache.pinot.core.segment.index.metadata.ColumnMetadata; import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl; -import org.apache.pinot.spi.config.table.ColumnPartitionConfig; -import org.apache.pinot.spi.config.table.SegmentPartitionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.filesystem.PinotFS; @@ -550,12 +547,8 @@ public class PinotLLCRealtimeSegmentManager { newSegmentZKMetadata.setNumReplicas(numReplicas); newSegmentZKMetadata.setStatus(Status.IN_PROGRESS); - // Add the partition metadata if available - SegmentPartitionMetadata partitionMetadata = - getPartitionMetadataFromTableConfig(tableConfig, numPartitions, newLLCSegmentName.getPartitionId()); - if (partitionMetadata != null) { - newSegmentZKMetadata.setPartitionMetadata(partitionMetadata); - } + // NOTE: DO NOT add the partition metadata for the consuming segment to prevent mis-pruning the segment when the + // stream is not partitioned properly // Update the flush threshold FlushThresholdUpdater flushThresholdUpdater = _flushThresholdUpdateManager.getFlushThresholdUpdater(streamConfig); @@ -566,36 +559,6 @@ public class PinotLLCRealtimeSegmentManager { } @Nullable - private SegmentPartitionMetadata getPartitionMetadataFromTableConfig(TableConfig tableConfig, int numPartitions, - int partitionId) { - SegmentPartitionConfig partitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig(); - if (partitionConfig == null) { - return null; - } - Map<String, ColumnPartitionMetadata> partitionMetadataMap = new TreeMap<>(); - for (Map.Entry<String, ColumnPartitionConfig> entry : partitionConfig.getColumnPartitionMap().entrySet()) { - String columnName = entry.getKey(); - ColumnPartitionConfig columnPartitionConfig = entry.getValue(); - - // NOTE: Here we compare the number of partitions from the config and the stream, and log a warning when they - // don't match, but use the one from the stream. The mismatch could happen when the stream partitions are - // changed, but the table config has not been updated to reflect the change. In such case, picking the - // number of partitions from the stream can keep the segment properly partitioned as long as the partition - // function is not changed. - if (columnPartitionConfig.getNumPartitions() != numPartitions) { - LOGGER.warn( - "Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions", - numPartitions, columnPartitionConfig.getNumPartitions()); - } - - partitionMetadataMap.put(columnName, - new ColumnPartitionMetadata(columnPartitionConfig.getFunctionName(), numPartitions, - Collections.singleton(partitionId))); - } - return new SegmentPartitionMetadata(partitionMetadataMap); - } - - @Nullable private SegmentPartitionMetadata getPartitionMetadataFromSegmentMetadata(SegmentMetadataImpl segmentMetadata) { Map<String, ColumnPartitionMetadata> partitionMetadataMap = new HashMap<>(); for (Map.Entry<String, ColumnMetadata> entry : segmentMetadata.getColumnMetadataMap().entrySet()) { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java index b98dd7d..0866131 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java @@ -30,7 +30,6 @@ import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata; import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status; -import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; @@ -44,6 +43,7 @@ import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -129,27 +129,28 @@ public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust List<RealtimeSegmentZKMetadata> segmentZKMetadataList = _helixResourceManager.getRealtimeSegmentMetadata(getTableName()); for (RealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { - SegmentPartitionMetadata segmentPartitionMetadata = segmentZKMetadata.getPartitionMetadata(); - assertNotNull(segmentPartitionMetadata); - Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap = - segmentPartitionMetadata.getColumnPartitionMap(); - assertEquals(columnPartitionMetadataMap.size(), 1); - ColumnPartitionMetadata columnPartitionMetadata = columnPartitionMetadataMap.get("Carrier"); - assertNotNull(columnPartitionMetadata); - - // The function name should be aligned with the partition config in the table config - assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur")); - - // Number of partitions should be the same as number of stream partitions - assertEquals(columnPartitionMetadata.getNumPartitions(), 2); if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) { - // For consuming segment, should contain only the stream partition - assertEquals(columnPartitionMetadata.getPartitions(), - Collections.singleton(new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId())); + // For consuming segment, there should be no partition metadata + assertNull(segmentZKMetadata.getPartitionMetadata()); } else { - // For completed segment, should contain the partitions based on the ingested records. Since the records are not - // partitioned in Kafka, it should contain all partitions. + // Completed segment + + SegmentPartitionMetadata segmentPartitionMetadata = segmentZKMetadata.getPartitionMetadata(); + assertNotNull(segmentPartitionMetadata); + Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap = + segmentPartitionMetadata.getColumnPartitionMap(); + assertEquals(columnPartitionMetadataMap.size(), 1); + ColumnPartitionMetadata columnPartitionMetadata = columnPartitionMetadataMap.get("Carrier"); + assertNotNull(columnPartitionMetadata); + + // The function name should be aligned with the partition config in the table config + assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur")); + + // Number of partitions should be the same as number of stream partitions + assertEquals(columnPartitionMetadata.getNumPartitions(), 2); + + // Should contain all partitions as the records are not partitioned in Kafka assertEquals(columnPartitionMetadata.getPartitions(), new HashSet<>(Arrays.asList(0, 1))); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
