This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch remove_consuming_partition_info in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit f83cbb569448d883250ae6d57d1070e80f5b3657 Author: Xiaotian (Jackie) Jiang <jackie....@gmail.com> AuthorDate: Mon Sep 21 12:10:37 2020 -0700 Remove the partition info from the consuming segment ZK metadata --- .../segmentpruner/PartitionSegmentPruner.java | 32 +++- .../realtime/PinotLLCRealtimeSegmentManager.java | 28 +-- ...PartitionLLCRealtimeClusterIntegrationTest.java | 199 +++++++++++++++++---- 3 files changed, 192 insertions(+), 67 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java index 8320b30..181bad9 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java @@ -32,7 +32,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata; import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; import org.apache.pinot.common.request.BrokerRequest; -import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.CommonConstants.Segment; import org.apache.pinot.common.utils.request.FilterQueryTree; import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.core.data.partition.PartitionFunction; @@ -76,17 +76,32 @@ public class PartitionSegmentPruner implements SegmentPruner { List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT); for (int i = 0; i < numSegments; i++) { String segment = segments.get(i); - _partitionInfoMap.put(segment, extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecords.get(i))); + PartitionInfo partitionInfo = extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecords.get(i)); + if (partitionInfo != null) { + _partitionInfoMap.put(segment, partitionInfo); + } } } + /** + * NOTE: Returns {@code null} when the ZNRecord is missing (could be transient Helix issue), or the segment is a + * consuming segment so that we can retry later. Returns {@link #INVALID_PARTITION_INFO} when the segment does + * not have valid partition metadata in its ZK metadata, in which case we won't retry later. + */ + @Nullable private PartitionInfo extractPartitionInfoFromSegmentZKMetadataZNRecord(String segment, @Nullable ZNRecord znRecord) { if (znRecord == null) { LOGGER.warn("Failed to find segment ZK metadata for segment: {}, table: {}", segment, _tableNameWithType); - return INVALID_PARTITION_INFO; + return null; } - String partitionMetadataJson = znRecord.getSimpleField(CommonConstants.Segment.PARTITION_METADATA); + // Skip processing the partition metadata for the consuming segment because the partition metadata is updated when + // the consuming segment is committed + if (Segment.Realtime.Status.IN_PROGRESS.name().equals(znRecord.getSimpleField(Segment.Realtime.STATUS))) { + return null; + } + + String partitionMetadataJson = znRecord.getSimpleField(Segment.PARTITION_METADATA); if (partitionMetadataJson == null) { LOGGER.warn("Failed to find segment partition metadata for segment: {}, table: {}", segment, _tableNameWithType); return INVALID_PARTITION_INFO; @@ -127,8 +142,13 @@ public class PartitionSegmentPruner implements SegmentPruner { @Override public synchronized void refreshSegment(String segment) { - _partitionInfoMap.put(segment, extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, - _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT))); + PartitionInfo partitionInfo = extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, + _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT)); + if (partitionInfo != null) { + _partitionInfoMap.put(segment, partitionInfo); + } else { + _partitionInfoMap.remove(segment); + } } @Override diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 43ea74c..b85bdb6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -27,7 +27,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -69,8 +68,6 @@ import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpd import org.apache.pinot.controller.util.SegmentCompletionUtils; import org.apache.pinot.core.segment.index.metadata.ColumnMetadata; import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl; -import org.apache.pinot.spi.config.table.ColumnPartitionConfig; -import org.apache.pinot.spi.config.table.SegmentPartitionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.filesystem.PinotFS; @@ -552,12 +549,8 @@ public class PinotLLCRealtimeSegmentManager { newSegmentZKMetadata.setNumReplicas(numReplicas); newSegmentZKMetadata.setStatus(Status.IN_PROGRESS); - // Add the partition metadata if available - SegmentPartitionMetadata partitionMetadata = - getPartitionMetadataFromTableConfig(tableConfig, newLLCSegmentName.getPartitionId()); - if (partitionMetadata != null) { - newSegmentZKMetadata.setPartitionMetadata(partitionMetadata); - } + // NOTE: DO NOT add the partition metadata for the consuming segment to prevent mis-pruning the segment when the + // stream is not partitioned properly. // Update the flush threshold FlushThresholdUpdater flushThresholdUpdater = _flushThresholdUpdateManager.getFlushThresholdUpdater(streamConfig); @@ -568,23 +561,6 @@ public class PinotLLCRealtimeSegmentManager { } @Nullable - private SegmentPartitionMetadata getPartitionMetadataFromTableConfig(TableConfig tableConfig, int partitionId) { - SegmentPartitionConfig partitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig(); - if (partitionConfig == null) { - return null; - } - Map<String, ColumnPartitionMetadata> partitionMetadataMap = new TreeMap<>(); - for (Map.Entry<String, ColumnPartitionConfig> entry : partitionConfig.getColumnPartitionMap().entrySet()) { - String columnName = entry.getKey(); - ColumnPartitionConfig columnPartitionConfig = entry.getValue(); - partitionMetadataMap.put(columnName, - new ColumnPartitionMetadata(columnPartitionConfig.getFunctionName(), columnPartitionConfig.getNumPartitions(), - Collections.singleton(partitionId))); - } - return new SegmentPartitionMetadata(partitionMetadataMap); - } - - @Nullable private SegmentPartitionMetadata getPartitionMetadataFromSegmentMetadata(SegmentMetadataImpl segmentMetadata) { Map<String, ColumnPartitionMetadata> partitionMetadataMap = new HashMap<>(); for (Map.Entry<String, ColumnMetadata> entry : segmentMetadata.getColumnMetadataMap().entrySet()) { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java index c520599..b049dbb 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.integration.tests; +import com.fasterxml.jackson.databind.JsonNode; import java.io.File; import java.util.Arrays; import java.util.Collections; @@ -30,6 +31,7 @@ import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata; import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status; +import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.IndexingConfig; @@ -45,6 +47,7 @@ import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -52,8 +55,14 @@ import static org.testng.Assert.assertTrue; * Integration test that enables segment partition for the LLC real-time table. */ public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClusterIntegrationTest { - // Number of documents in the first Avro file - private static final long NUM_DOCS = 9292; + private static final String PARTITION_COLUMN = "Carrier"; + // Number of documents in the first and second Avro file + private static final long NUM_DOCS_IN_FIRST_AVRO_FILE = 9292; + private static final long NUM_DOCS_IN_SECOND_AVRO_FILE = 8736; + + private List<File> _avroFiles; + private String _partitionColumn; + private long _countStarResult; @BeforeClass public void setUp() @@ -70,32 +79,34 @@ public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust startKafka(); // Unpack the Avro files - List<File> avroFiles = unpackAvroData(_tempDir); + _avroFiles = unpackAvroData(_tempDir); // Create and upload the schema and table config with reduced number of columns and partition config - Schema schema = - new Schema.SchemaBuilder().setSchemaName(getSchemaName()).addSingleValueDimension("Carrier", DataType.STRING) - .addDateTime("DaysSinceEpoch", DataType.INT, "1:DAYS:EPOCH", "1:DAYS").build(); + Schema schema = new Schema.SchemaBuilder().setSchemaName(getSchemaName()) + .addSingleValueDimension(PARTITION_COLUMN, DataType.STRING) + .addDateTime("DaysSinceEpoch", DataType.INT, "1:DAYS:EPOCH", "1:DAYS").build(); addSchema(schema); - TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0)); + TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0)); IndexingConfig indexingConfig = tableConfig.getIndexingConfig(); indexingConfig.setSegmentPartitionConfig( - new SegmentPartitionConfig(Collections.singletonMap("Carrier", new ColumnPartitionConfig("murmur", 5)))); + new SegmentPartitionConfig(Collections.singletonMap(PARTITION_COLUMN, new ColumnPartitionConfig("murmur", 5)))); tableConfig.setRoutingConfig( new RoutingConfig(null, Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE), null)); addTableConfig(tableConfig); // Push data into Kafka (only ingest the first Avro file) - pushAvroIntoKafka(Collections.singletonList(avroFiles.get(0))); + _partitionColumn = PARTITION_COLUMN; + pushAvroIntoKafka(Collections.singletonList(_avroFiles.get(0))); // Wait for all documents loaded + _countStarResult = NUM_DOCS_IN_FIRST_AVRO_FILE; waitForAllDocsLoaded(600_000L); } @Override protected long getCountStarResult() { - return NUM_DOCS; + return _countStarResult; } @Override @@ -105,6 +116,12 @@ public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust @Nullable @Override + protected String getPartitionColumn() { + return _partitionColumn; + } + + @Nullable + @Override protected List<String> getInvertedIndexColumns() { return null; } @@ -129,47 +146,159 @@ public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust @Test public void testPartitionMetadata() { + int[] numCompletedSegmentsForPartition = new int[2]; List<RealtimeSegmentZKMetadata> segmentZKMetadataList = _helixResourceManager.getRealtimeSegmentMetadata(getTableName()); for (RealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { - SegmentPartitionMetadata segmentPartitionMetadata = segmentZKMetadata.getPartitionMetadata(); - assertNotNull(segmentPartitionMetadata); - Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap = - segmentPartitionMetadata.getColumnPartitionMap(); - assertEquals(columnPartitionMetadataMap.size(), 1); - ColumnPartitionMetadata columnPartitionMetadata = columnPartitionMetadataMap.get("Carrier"); - assertNotNull(columnPartitionMetadata); + if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) { + // For consuming segment, there should be no partition metadata + assertNull(segmentZKMetadata.getPartitionMetadata()); + } else { + // Completed segment - // The function name should be aligned with the partition config in the table config - assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur")); + SegmentPartitionMetadata segmentPartitionMetadata = segmentZKMetadata.getPartitionMetadata(); + assertNotNull(segmentPartitionMetadata); + Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap = + segmentPartitionMetadata.getColumnPartitionMap(); + assertEquals(columnPartitionMetadataMap.size(), 1); + ColumnPartitionMetadata columnPartitionMetadata = columnPartitionMetadataMap.get(PARTITION_COLUMN); + assertNotNull(columnPartitionMetadata); - if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) { - // Consuming segment + // The function name should be aligned with the partition config in the table config + assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur")); + + // Number of partitions should be the same as number of stream partitions + assertEquals(columnPartitionMetadata.getNumPartitions(), 2); + + // Should contain only one partition, which is the same as the stream partition + int streamPartition = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId(); + assertEquals(columnPartitionMetadata.getPartitions(), Collections.singleton(streamPartition)); + + numCompletedSegmentsForPartition[streamPartition]++; + } + } + + // There should be 0 completed segments for partition 0, 2 completed segments for partition 1 + assertEquals(numCompletedSegmentsForPartition[0], 0); + assertEquals(numCompletedSegmentsForPartition[1], 2); + } + + @Test(dependsOnMethods = "testPartitionMetadata") + public void testPartitionRouting() + throws Exception { + // Query partition 0 + { + String query = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'UA'"; + JsonNode response = postQuery(query); + + String queryToCompare = "SELECT COUNT(*) FROM mytable WHERE Carrier BETWEEN 'UA' AND 'UA'"; + JsonNode responseToCompare = postQuery(queryToCompare); + + // Should only query the consuming segment for both partition 0 and partition 1 + assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 2); + assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 4); + + assertEquals(response.get("aggregationResults").get(0).get("value").asInt(), + responseToCompare.get("aggregationResults").get(0).get("value").asInt()); + } + + // Query partition 1 + { + String query = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'AA'"; + JsonNode response = postQuery(query); + + String queryToCompare = "SELECT COUNT(*) FROM mytable WHERE Carrier BETWEEN 'AA' AND 'AA'"; + JsonNode responseToCompare = postQuery(queryToCompare); - // Number of partitions should be aligned with the partition config in the table config - assertEquals(columnPartitionMetadata.getNumPartitions(), 5); + // Should query all the segments + assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 4); + assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 4); - // Should contain only the stream partition - assertEquals(columnPartitionMetadata.getPartitions(), - Collections.singleton(new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId())); + assertEquals(response.get("aggregationResults").get(0).get("value").asInt(), + responseToCompare.get("aggregationResults").get(0).get("value").asInt()); + } + } + + @Test(dependsOnMethods = "testPartitionRouting") + public void testNonPartitionedStream() + throws Exception { + // Push the second Avro file into Kafka without partitioning + _partitionColumn = null; + pushAvroIntoKafka(Collections.singletonList(_avroFiles.get(1))); + + // Wait for all documents loaded + _countStarResult = NUM_DOCS_IN_FIRST_AVRO_FILE + NUM_DOCS_IN_SECOND_AVRO_FILE; + waitForAllDocsLoaded(600_000L); + + // Check partition metadata + List<RealtimeSegmentZKMetadata> segmentZKMetadataList = + _helixResourceManager.getRealtimeSegmentMetadata(getTableName()); + for (RealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { + if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) { + // For consuming segment, there should be no partition metadata + assertNull(segmentZKMetadata.getPartitionMetadata()); } else { // Completed segment + SegmentPartitionMetadata segmentPartitionMetadata = segmentZKMetadata.getPartitionMetadata(); + assertNotNull(segmentPartitionMetadata); + Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap = + segmentPartitionMetadata.getColumnPartitionMap(); + assertEquals(columnPartitionMetadataMap.size(), 1); + ColumnPartitionMetadata columnPartitionMetadata = columnPartitionMetadataMap.get(PARTITION_COLUMN); + assertNotNull(columnPartitionMetadata); + + // The function name should be aligned with the partition config in the table config + assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur")); + // Number of partitions should be the same as number of stream partitions assertEquals(columnPartitionMetadata.getNumPartitions(), 2); - // Should contain the partitions based on the ingested records. Since the records are not partitioned in Kafka, - // it should contain all the partitions. - assertEquals(columnPartitionMetadata.getPartitions(), new HashSet<>(Arrays.asList(0, 1))); + // The partition metadata for the new completed segments should contain both partitions + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentZKMetadata.getSegmentName()); + int streamPartition = llcSegmentName.getPartitionId(); + int sequenceNumber = llcSegmentName.getSequenceNumber(); + if (streamPartition == 0 || (streamPartition == 1 && sequenceNumber >= 2)) { + assertEquals(columnPartitionMetadata.getPartitions(), new HashSet<>(Arrays.asList(0, 1))); + } } } - } - // TODO: Add test on partition routing once the consuming segment behavior is fixed. - // Currently the partition info is cached in the PartitionSegmentPruner, and won't be reloaded when the - // consuming segment gets committed. The segment will be pruned based on the consuming segment partition info - // (using stream partition as the segment partition), even if the partition info changed for the completed - // segment. + // Check partition routing + int numSegments = segmentZKMetadataList.size(); + + // Query partition 0 + { + String query = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'UA'"; + JsonNode response = postQuery(query); + + String queryToCompare = "SELECT COUNT(*) FROM mytable WHERE Carrier BETWEEN 'UA' AND 'UA'"; + JsonNode responseToCompare = postQuery(queryToCompare); + + // Should skip the first 2 completed segments for partition 1 + assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), numSegments - 2); + assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), numSegments); + + assertEquals(response.get("aggregationResults").get(0).get("value").asInt(), + responseToCompare.get("aggregationResults").get(0).get("value").asInt()); + } + + // Query partition 1 + { + String query = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'AA'"; + JsonNode response = postQuery(query); + + String queryToCompare = "SELECT COUNT(*) FROM mytable WHERE Carrier BETWEEN 'AA' AND 'AA'"; + JsonNode responseToCompare = postQuery(queryToCompare); + + // Should query all the segments + assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), numSegments); + assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), numSegments); + + assertEquals(response.get("aggregationResults").get(0).get("value").asInt(), + responseToCompare.get("aggregationResults").get(0).get("value").asInt()); + } + } @AfterClass public void tearDown() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org