This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch remove_consuming_partition_info
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit f83cbb569448d883250ae6d57d1070e80f5b3657
Author: Xiaotian (Jackie) Jiang <jackie....@gmail.com>
AuthorDate: Mon Sep 21 12:10:37 2020 -0700

    Remove the partition info from the consuming segment ZK metadata
---
 .../segmentpruner/PartitionSegmentPruner.java      |  32 +++-
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  28 +--
 ...PartitionLLCRealtimeClusterIntegrationTest.java | 199 +++++++++++++++++----
 3 files changed, 192 insertions(+), 67 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
index 8320b30..181bad9 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
@@ -32,7 +32,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
 import org.apache.pinot.common.utils.request.FilterQueryTree;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.data.partition.PartitionFunction;
@@ -76,17 +76,32 @@ public class PartitionSegmentPruner implements 
SegmentPruner {
     List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, 
null, AccessOption.PERSISTENT);
     for (int i = 0; i < numSegments; i++) {
       String segment = segments.get(i);
-      _partitionInfoMap.put(segment, 
extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecords.get(i)));
+      PartitionInfo partitionInfo = 
extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecords.get(i));
+      if (partitionInfo != null) {
+        _partitionInfoMap.put(segment, partitionInfo);
+      }
     }
   }
 
+  /**
+   * NOTE: Returns {@code null} when the ZNRecord is missing (could be 
transient Helix issue), or the segment is a
+   *       consuming segment so that we can retry later. Returns {@link 
#INVALID_PARTITION_INFO} when the segment does
+   *       not have valid partition metadata in its ZK metadata, in which case 
we won't retry later.
+   */
+  @Nullable
   private PartitionInfo 
extractPartitionInfoFromSegmentZKMetadataZNRecord(String segment, @Nullable 
ZNRecord znRecord) {
     if (znRecord == null) {
       LOGGER.warn("Failed to find segment ZK metadata for segment: {}, table: 
{}", segment, _tableNameWithType);
-      return INVALID_PARTITION_INFO;
+      return null;
     }
 
-    String partitionMetadataJson = 
znRecord.getSimpleField(CommonConstants.Segment.PARTITION_METADATA);
+    // Skip processing the partition metadata for the consuming segment 
because the partition metadata is updated when
+    // the consuming segment is committed
+    if 
(Segment.Realtime.Status.IN_PROGRESS.name().equals(znRecord.getSimpleField(Segment.Realtime.STATUS)))
 {
+      return null;
+    }
+
+    String partitionMetadataJson = 
znRecord.getSimpleField(Segment.PARTITION_METADATA);
     if (partitionMetadataJson == null) {
       LOGGER.warn("Failed to find segment partition metadata for segment: {}, 
table: {}", segment, _tableNameWithType);
       return INVALID_PARTITION_INFO;
@@ -127,8 +142,13 @@ public class PartitionSegmentPruner implements 
SegmentPruner {
 
   @Override
   public synchronized void refreshSegment(String segment) {
-    _partitionInfoMap.put(segment, 
extractPartitionInfoFromSegmentZKMetadataZNRecord(segment,
-        _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, 
AccessOption.PERSISTENT)));
+    PartitionInfo partitionInfo = 
extractPartitionInfoFromSegmentZKMetadataZNRecord(segment,
+        _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, 
AccessOption.PERSISTENT));
+    if (partitionInfo != null) {
+      _partitionInfoMap.put(segment, partitionInfo);
+    } else {
+      _partitionInfoMap.remove(segment);
+    }
   }
 
   @Override
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 43ea74c..b85bdb6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -27,7 +27,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -69,8 +68,6 @@ import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpd
 import org.apache.pinot.controller.util.SegmentCompletionUtils;
 import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.filesystem.PinotFS;
@@ -552,12 +549,8 @@ public class PinotLLCRealtimeSegmentManager {
     newSegmentZKMetadata.setNumReplicas(numReplicas);
     newSegmentZKMetadata.setStatus(Status.IN_PROGRESS);
 
-    // Add the partition metadata if available
-    SegmentPartitionMetadata partitionMetadata =
-        getPartitionMetadataFromTableConfig(tableConfig, 
newLLCSegmentName.getPartitionId());
-    if (partitionMetadata != null) {
-      newSegmentZKMetadata.setPartitionMetadata(partitionMetadata);
-    }
+    // NOTE: DO NOT add the partition metadata for the consuming segment to 
prevent mis-pruning the segment when the
+    //       stream is not partitioned properly.
 
     // Update the flush threshold
     FlushThresholdUpdater flushThresholdUpdater = 
_flushThresholdUpdateManager.getFlushThresholdUpdater(streamConfig);
@@ -568,23 +561,6 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   @Nullable
-  private SegmentPartitionMetadata 
getPartitionMetadataFromTableConfig(TableConfig tableConfig, int partitionId) {
-    SegmentPartitionConfig partitionConfig = 
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
-    if (partitionConfig == null) {
-      return null;
-    }
-    Map<String, ColumnPartitionMetadata> partitionMetadataMap = new 
TreeMap<>();
-    for (Map.Entry<String, ColumnPartitionConfig> entry : 
partitionConfig.getColumnPartitionMap().entrySet()) {
-      String columnName = entry.getKey();
-      ColumnPartitionConfig columnPartitionConfig = entry.getValue();
-      partitionMetadataMap.put(columnName,
-          new ColumnPartitionMetadata(columnPartitionConfig.getFunctionName(), 
columnPartitionConfig.getNumPartitions(),
-              Collections.singleton(partitionId)));
-    }
-    return new SegmentPartitionMetadata(partitionMetadataMap);
-  }
-
-  @Nullable
   private SegmentPartitionMetadata 
getPartitionMetadataFromSegmentMetadata(SegmentMetadataImpl segmentMetadata) {
     Map<String, ColumnPartitionMetadata> partitionMetadataMap = new 
HashMap<>();
     for (Map.Entry<String, ColumnMetadata> entry : 
segmentMetadata.getColumnMetadataMap().entrySet()) {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
index c520599..b049dbb 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.integration.tests;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
@@ -30,6 +31,7 @@ import 
org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
+import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
@@ -45,6 +47,7 @@ import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
 
@@ -52,8 +55,14 @@ import static org.testng.Assert.assertTrue;
  * Integration test that enables segment partition for the LLC real-time table.
  */
 public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends 
BaseClusterIntegrationTest {
-  // Number of documents in the first Avro file
-  private static final long NUM_DOCS = 9292;
+  private static final String PARTITION_COLUMN = "Carrier";
+  // Number of documents in the first and second Avro file
+  private static final long NUM_DOCS_IN_FIRST_AVRO_FILE = 9292;
+  private static final long NUM_DOCS_IN_SECOND_AVRO_FILE = 8736;
+
+  private List<File> _avroFiles;
+  private String _partitionColumn;
+  private long _countStarResult;
 
   @BeforeClass
   public void setUp()
@@ -70,32 +79,34 @@ public class 
SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
     startKafka();
 
     // Unpack the Avro files
-    List<File> avroFiles = unpackAvroData(_tempDir);
+    _avroFiles = unpackAvroData(_tempDir);
 
     // Create and upload the schema and table config with reduced number of 
columns and partition config
-    Schema schema =
-        new 
Schema.SchemaBuilder().setSchemaName(getSchemaName()).addSingleValueDimension("Carrier",
 DataType.STRING)
-            .addDateTime("DaysSinceEpoch", DataType.INT, "1:DAYS:EPOCH", 
"1:DAYS").build();
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(getSchemaName())
+        .addSingleValueDimension(PARTITION_COLUMN, DataType.STRING)
+        .addDateTime("DaysSinceEpoch", DataType.INT, "1:DAYS:EPOCH", 
"1:DAYS").build();
     addSchema(schema);
 
-    TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0));
+    TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
     IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
     indexingConfig.setSegmentPartitionConfig(
-        new SegmentPartitionConfig(Collections.singletonMap("Carrier", new 
ColumnPartitionConfig("murmur", 5))));
+        new SegmentPartitionConfig(Collections.singletonMap(PARTITION_COLUMN, 
new ColumnPartitionConfig("murmur", 5))));
     tableConfig.setRoutingConfig(
         new RoutingConfig(null, 
Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE), null));
     addTableConfig(tableConfig);
 
     // Push data into Kafka (only ingest the first Avro file)
-    pushAvroIntoKafka(Collections.singletonList(avroFiles.get(0)));
+    _partitionColumn = PARTITION_COLUMN;
+    pushAvroIntoKafka(Collections.singletonList(_avroFiles.get(0)));
 
     // Wait for all documents loaded
+    _countStarResult = NUM_DOCS_IN_FIRST_AVRO_FILE;
     waitForAllDocsLoaded(600_000L);
   }
 
   @Override
   protected long getCountStarResult() {
-    return NUM_DOCS;
+    return _countStarResult;
   }
 
   @Override
@@ -105,6 +116,12 @@ public class 
SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
 
   @Nullable
   @Override
+  protected String getPartitionColumn() {
+    return _partitionColumn;
+  }
+
+  @Nullable
+  @Override
   protected List<String> getInvertedIndexColumns() {
     return null;
   }
@@ -129,47 +146,159 @@ public class 
SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
 
   @Test
   public void testPartitionMetadata() {
+    int[] numCompletedSegmentsForPartition = new int[2];
     List<RealtimeSegmentZKMetadata> segmentZKMetadataList =
         _helixResourceManager.getRealtimeSegmentMetadata(getTableName());
     for (RealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
-      SegmentPartitionMetadata segmentPartitionMetadata = 
segmentZKMetadata.getPartitionMetadata();
-      assertNotNull(segmentPartitionMetadata);
-      Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap =
-          segmentPartitionMetadata.getColumnPartitionMap();
-      assertEquals(columnPartitionMetadataMap.size(), 1);
-      ColumnPartitionMetadata columnPartitionMetadata = 
columnPartitionMetadataMap.get("Carrier");
-      assertNotNull(columnPartitionMetadata);
+      if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
+        // For consuming segment, there should be no partition metadata
+        assertNull(segmentZKMetadata.getPartitionMetadata());
+      } else {
+        // Completed segment
 
-      // The function name should be aligned with the partition config in the 
table config
-      
assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
+        SegmentPartitionMetadata segmentPartitionMetadata = 
segmentZKMetadata.getPartitionMetadata();
+        assertNotNull(segmentPartitionMetadata);
+        Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap =
+            segmentPartitionMetadata.getColumnPartitionMap();
+        assertEquals(columnPartitionMetadataMap.size(), 1);
+        ColumnPartitionMetadata columnPartitionMetadata = 
columnPartitionMetadataMap.get(PARTITION_COLUMN);
+        assertNotNull(columnPartitionMetadata);
 
-      if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
-        // Consuming segment
+        // The function name should be aligned with the partition config in 
the table config
+        
assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
+
+        // Number of partitions should be the same as number of stream 
partitions
+        assertEquals(columnPartitionMetadata.getNumPartitions(), 2);
+
+        // Should contain only one partition, which is the same as the stream 
partition
+        int streamPartition = new 
LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId();
+        assertEquals(columnPartitionMetadata.getPartitions(), 
Collections.singleton(streamPartition));
+
+        numCompletedSegmentsForPartition[streamPartition]++;
+      }
+    }
+
+    // There should be 0 completed segments for partition 0, 2 completed 
segments for partition 1
+    assertEquals(numCompletedSegmentsForPartition[0], 0);
+    assertEquals(numCompletedSegmentsForPartition[1], 2);
+  }
+
+  @Test(dependsOnMethods = "testPartitionMetadata")
+  public void testPartitionRouting()
+      throws Exception {
+    // Query partition 0
+    {
+      String query = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'UA'";
+      JsonNode response = postQuery(query);
+
+      String queryToCompare = "SELECT COUNT(*) FROM mytable WHERE Carrier 
BETWEEN 'UA' AND 'UA'";
+      JsonNode responseToCompare = postQuery(queryToCompare);
+
+      // Should only query the consuming segment for both partition 0 and 
partition 1
+      assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 2);
+      
assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 4);
+
+      
assertEquals(response.get("aggregationResults").get(0).get("value").asInt(),
+          
responseToCompare.get("aggregationResults").get(0).get("value").asInt());
+    }
+
+    // Query partition 1
+    {
+      String query = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'AA'";
+      JsonNode response = postQuery(query);
+
+      String queryToCompare = "SELECT COUNT(*) FROM mytable WHERE Carrier 
BETWEEN 'AA' AND 'AA'";
+      JsonNode responseToCompare = postQuery(queryToCompare);
 
-        // Number of partitions should be aligned with the partition config in 
the table config
-        assertEquals(columnPartitionMetadata.getNumPartitions(), 5);
+      // Should query all the segments
+      assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 4);
+      
assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 4);
 
-        // Should contain only the stream partition
-        assertEquals(columnPartitionMetadata.getPartitions(),
-            Collections.singleton(new 
LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId()));
+      
assertEquals(response.get("aggregationResults").get(0).get("value").asInt(),
+          
responseToCompare.get("aggregationResults").get(0).get("value").asInt());
+    }
+  }
+
+  @Test(dependsOnMethods = "testPartitionRouting")
+  public void testNonPartitionedStream()
+      throws Exception {
+    // Push the second Avro file into Kafka without partitioning
+    _partitionColumn = null;
+    pushAvroIntoKafka(Collections.singletonList(_avroFiles.get(1)));
+
+    // Wait for all documents loaded
+    _countStarResult = NUM_DOCS_IN_FIRST_AVRO_FILE + 
NUM_DOCS_IN_SECOND_AVRO_FILE;
+    waitForAllDocsLoaded(600_000L);
+
+    // Check partition metadata
+    List<RealtimeSegmentZKMetadata> segmentZKMetadataList =
+        _helixResourceManager.getRealtimeSegmentMetadata(getTableName());
+    for (RealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
+      if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
+        // For consuming segment, there should be no partition metadata
+        assertNull(segmentZKMetadata.getPartitionMetadata());
       } else {
         // Completed segment
 
+        SegmentPartitionMetadata segmentPartitionMetadata = 
segmentZKMetadata.getPartitionMetadata();
+        assertNotNull(segmentPartitionMetadata);
+        Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap =
+            segmentPartitionMetadata.getColumnPartitionMap();
+        assertEquals(columnPartitionMetadataMap.size(), 1);
+        ColumnPartitionMetadata columnPartitionMetadata = 
columnPartitionMetadataMap.get(PARTITION_COLUMN);
+        assertNotNull(columnPartitionMetadata);
+
+        // The function name should be aligned with the partition config in 
the table config
+        
assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
+
         // Number of partitions should be the same as number of stream 
partitions
         assertEquals(columnPartitionMetadata.getNumPartitions(), 2);
 
-        // Should contain the partitions based on the ingested records. Since 
the records are not partitioned in Kafka,
-        // it should contain all the partitions.
-        assertEquals(columnPartitionMetadata.getPartitions(), new 
HashSet<>(Arrays.asList(0, 1)));
+        // The partition metadata for the new completed segments should 
contain both partitions
+        LLCSegmentName llcSegmentName = new 
LLCSegmentName(segmentZKMetadata.getSegmentName());
+        int streamPartition = llcSegmentName.getPartitionId();
+        int sequenceNumber = llcSegmentName.getSequenceNumber();
+        if (streamPartition == 0 || (streamPartition == 1 && sequenceNumber >= 
2)) {
+          assertEquals(columnPartitionMetadata.getPartitions(), new 
HashSet<>(Arrays.asList(0, 1)));
+        }
       }
     }
-  }
 
-  // TODO: Add test on partition routing once the consuming segment behavior 
is fixed.
-  //       Currently the partition info is cached in the 
PartitionSegmentPruner, and won't be reloaded when the
-  //       consuming segment gets committed. The segment will be pruned based 
on the consuming segment partition info
-  //       (using stream partition as the segment partition), even if the 
partition info changed for the completed
-  //       segment.
+    // Check partition routing
+    int numSegments = segmentZKMetadataList.size();
+
+    // Query partition 0
+    {
+      String query = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'UA'";
+      JsonNode response = postQuery(query);
+
+      String queryToCompare = "SELECT COUNT(*) FROM mytable WHERE Carrier 
BETWEEN 'UA' AND 'UA'";
+      JsonNode responseToCompare = postQuery(queryToCompare);
+
+      // Should skip the first 2 completed segments for partition 1
+      assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 
numSegments - 2);
+      
assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 
numSegments);
+
+      
assertEquals(response.get("aggregationResults").get(0).get("value").asInt(),
+          
responseToCompare.get("aggregationResults").get(0).get("value").asInt());
+    }
+
+    // Query partition 1
+    {
+      String query = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'AA'";
+      JsonNode response = postQuery(query);
+
+      String queryToCompare = "SELECT COUNT(*) FROM mytable WHERE Carrier 
BETWEEN 'AA' AND 'AA'";
+      JsonNode responseToCompare = postQuery(queryToCompare);
+
+      // Should query all the segments
+      assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 
numSegments);
+      
assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 
numSegments);
+
+      
assertEquals(response.get("aggregationResults").get(0).get("value").asInt(),
+          
responseToCompare.get("aggregationResults").get(0).get("value").asInt());
+    }
+  }
 
   @AfterClass
   public void tearDown()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to