This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new dfc2cd06d8 RealtimeSegmentConverter was using incorrect schema (#13877)
dfc2cd06d8 is described below

commit dfc2cd06d86f60a24f5facf2f02e3bd37ae0efc5
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Wed Sep 18 10:30:40 2024 +0200

    RealtimeSegmentConverter was using incorrect schema (#13877)
    
    Change the way scheam is copied in RealtimeSegmentConverter to keep column 
base nullability
---
 .../converter/RealtimeSegmentConverter.java        |  9 +---
 .../converter/RealtimeSegmentConverterTest.java    | 28 +++++++++++--
 .../java/org/apache/pinot/spi/data/Schema.java     | 38 +++++++++++++++++
 .../java/org/apache/pinot/spi/data/SchemaTest.java | 48 ++++++++++++++++++++++
 .../org/apache/pinot/tools/QuickStartBase.java     |  4 +-
 .../pinot/tools/streams/MeetupRsvpStream.java      |  8 +++-
 .../pinot/tools/streams/RsvpSourceGenerator.java   |  7 +++-
 .../upsertMeetupRsvp_realtime_table_config.json    |  3 +-
 .../upsertMeetupRsvp/upsertMeetupRsvp_schema.json  |  3 +-
 9 files changed, 129 insertions(+), 19 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
index 977bba0451..2082f35622 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
@@ -40,7 +40,6 @@ import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
 
@@ -164,13 +163,7 @@ public class RealtimeSegmentConverter {
    */
   @VisibleForTesting
   public static Schema getUpdatedSchema(Schema original) {
-    Schema newSchema = new Schema();
-    for (FieldSpec fieldSpec : original.getAllFieldSpecs()) {
-      if (!fieldSpec.isVirtualColumn()) {
-        newSchema.addField(fieldSpec);
-      }
-    }
-    return newSchema;
+    return original.withoutVirtualColumns();
   }
 
   public boolean isColumnMajorEnabled() {
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
index e2dd41dbb9..5e6a2fe555 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
@@ -29,7 +29,9 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -73,6 +75,7 @@ import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 
 
@@ -99,14 +102,31 @@ public class RealtimeSegmentConverterTest {
 
   @Test
   public void testNoVirtualColumnsInSchema() {
-    Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("col1", 
FieldSpec.DataType.STRING)
+    // @formatter:off
+    Schema schema = new Schema.SchemaBuilder()
+        .setSchemaName("someName")
+        .setEnableColumnBasedNullHandling(true)
+        .addSingleValueDimension("col1", FieldSpec.DataType.STRING)
         .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS, "col1"),
-            new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.DAYS, 
"col2")).build();
+            new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.DAYS, 
"col2"))
+        .build();
+    // @formatter:on
     String segmentName = "segment1";
     
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(schema, 
segmentName);
-    assertEquals(schema.getColumnNames().size(), 5);
+    Set<FieldSpec> initialVirtualColumns = getVirtualColumns(schema);
+    assertNotEquals(initialVirtualColumns, Collections.emptySet(), "Initial 
virtual columns should not be empty");
     Schema newSchema = RealtimeSegmentConverter.getUpdatedSchema(schema);
-    assertEquals(newSchema.getColumnNames().size(), 2);
+    Set<FieldSpec> newVirtualColumns = getVirtualColumns(newSchema);
+    assertEquals(newVirtualColumns, Collections.emptySet(), "Virtual columns 
should be removed");
+    assertEquals(newSchema.getSchemaName(), schema.getSchemaName(), "Schema 
name should be the same");
+    assertEquals(newSchema.isEnableColumnBasedNullHandling(), 
schema.isEnableColumnBasedNullHandling(),
+        "Column based null handling should be the same");
+  }
+
+  private Set<FieldSpec> getVirtualColumns(Schema schema) {
+    return schema.getAllFieldSpecs().stream()
+        .filter(FieldSpec::isVirtualColumn)
+        .collect(Collectors.toSet());
   }
 
   @Test
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index a7439d5d1f..d154825746 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -38,6 +38,7 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -73,6 +74,7 @@ public final class Schema implements Serializable {
   private final List<ComplexFieldSpec> _complexFieldSpecs = new ArrayList<>();
   // names of the columns that used as primary keys
   // TODO(yupeng): add validation checks like duplicate columns and use of 
time column
+  @Nullable
   private List<String> _primaryKeyColumns;
 
   // Json ignored fields
@@ -176,6 +178,7 @@ public final class Schema implements Serializable {
     _enableColumnBasedNullHandling = enableColumnBasedNullHandling;
   }
 
+  @Nullable
   public List<String> getPrimaryKeyColumns() {
     return _primaryKeyColumns;
   }
@@ -376,6 +379,34 @@ public final class Schema implements Serializable {
     return physicalColumnNames;
   }
 
+  /**
+   * Returns a new schema containing only physical columns.
+   *
+   * All properties but the fields are the same.
+   * All field attributes are a shallow copy without the virtual column.
+   */
+  public Schema withoutVirtualColumns() {
+    Schema newSchema = new Schema();
+    newSchema.setSchemaName(getSchemaName());
+    
newSchema.setEnableColumnBasedNullHandling(isEnableColumnBasedNullHandling());
+    List<String> primaryKeyColumns = getPrimaryKeyColumns();
+    if (primaryKeyColumns != null) {
+      newSchema.setPrimaryKeyColumns(primaryKeyColumns.stream()
+          .filter(primaryKey -> {
+            FieldSpec fieldSpec = _fieldSpecMap.get(primaryKey);
+            return fieldSpec != null && !fieldSpec.isVirtualColumn();
+          })
+          .collect(Collectors.toList())
+      );
+    }
+    for (FieldSpec fieldSpec : getAllFieldSpecs()) {
+      if (!fieldSpec.isVirtualColumn()) {
+        newSchema.addField(fieldSpec);
+      }
+    }
+    return newSchema;
+  }
+
   /**
    * NOTE: The returned FieldSpecs are sorted with the column name 
alphabetically.
    */
@@ -840,10 +871,17 @@ public final class Schema implements Serializable {
     return result;
   }
 
+  /**
+   * @deprecated this method is not correctly implemented. ie doesn't call 
super.clone and does not create a deep copy
+   * of the fieldSpecs.
+   */
+  @Deprecated
+  @Override
   public Schema clone() {
     Schema cloned = new SchemaBuilder()
         .setSchemaName(getSchemaName())
         .setPrimaryKeyColumns(getPrimaryKeyColumns())
+        .setEnableColumnBasedNullHandling(isEnableColumnBasedNullHandling())
         .build();
     getAllFieldSpecs().forEach(fieldSpec -> cloned.addField(fieldSpec));
     return cloned;
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/data/SchemaTest.java 
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/SchemaTest.java
index c4a2aee5be..a70074d08c 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/data/SchemaTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/data/SchemaTest.java
@@ -22,9 +22,11 @@ import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.Timestamp;
+import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.spi.data.TimeGranularitySpec.TimeFormat;
 import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -646,4 +648,50 @@ public class SchemaTest {
     Assert.assertEquals(newSchema.getFieldSpecFor("svBoolean").getDataType(), 
FieldSpec.DataType.BOOLEAN);
     
Assert.assertEquals(newSchema.getFieldSpecFor("svBooleanWithDefault").getDataType(),
 FieldSpec.DataType.BOOLEAN);
   }
+
+  @Test
+  public void testWithoutVirtualColumns() {
+    DimensionFieldSpec virtualField = new DimensionFieldSpec(
+        CommonConstants.Segment.BuiltInVirtualColumn.DOCID, 
FieldSpec.DataType.INT, true, Integer.class);
+
+    Schema schema = new Schema.SchemaBuilder()
+        .setSchemaName("testSchema")
+        .setEnableColumnBasedNullHandling(true)
+        .addMetricField("metric", FieldSpec.DataType.INT)
+        .addField(virtualField)
+        .setPrimaryKeyColumns(Lists.newArrayList("metric"))
+        .build();
+
+    Schema withoutVirtualColumns = schema.withoutVirtualColumns();
+    Assert.assertTrue(withoutVirtualColumns.getAllFieldSpecs().stream()
+        .noneMatch(field -> 
field.getName().equals(CommonConstants.Segment.BuiltInVirtualColumn.DOCID)),
+        "Virtual column " + CommonConstants.Segment.BuiltInVirtualColumn.DOCID 
+ " should be removed");
+
+    Assert.assertEquals(withoutVirtualColumns.getPrimaryKeyColumns(), 
schema.getPrimaryKeyColumns(),
+        "Primary key columns should be equal");
+    Assert.assertNotSame(withoutVirtualColumns.getPrimaryKeyColumns(), 
schema.getPrimaryKeyColumns(),
+        "Primary key columns should not be the same object");
+
+    withoutVirtualColumns.addField(virtualField);
+    Assert.assertEquals(withoutVirtualColumns, schema, "After adding the 
virtual field both schemas should be equal");
+  }
+
+  @Test
+  public void testWithoutVirtualColumnsRemoveVirtualPrimaryKeys() {
+    DimensionFieldSpec virtualField = new DimensionFieldSpec(
+        CommonConstants.Segment.BuiltInVirtualColumn.DOCID, 
FieldSpec.DataType.INT, true, Integer.class);
+
+    Schema schema = new Schema.SchemaBuilder()
+        .setSchemaName("testSchema")
+        .setEnableColumnBasedNullHandling(true)
+        .addMetricField("metric", FieldSpec.DataType.INT)
+        .addField(virtualField)
+        .setPrimaryKeyColumns(Lists.newArrayList("metric", 
virtualField.getName()))
+        .build();
+
+    Schema withoutVirtualColumns = schema.withoutVirtualColumns();
+
+    Assert.assertEquals(withoutVirtualColumns.getPrimaryKeyColumns(), 
Collections.singletonList("metric"),
+        "Unexpected primary key columns");
+  }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
index 5aef89db1d..2913345c46 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
@@ -474,7 +474,7 @@ public abstract class QuickStartBase {
           printStatus(Quickstart.Color.CYAN,
               "***** Starting upsertMeetupRSVPEvents data stream and 
publishing to Kafka *****");
           MeetupRsvpStream upsertMeetupRsvpProvider =
-              new MeetupRsvpStream("upsertMeetupRSVPEvents", 
RsvpSourceGenerator.KeyColumn.EVENT_ID);
+              new MeetupRsvpStream("upsertMeetupRSVPEvents", 
RsvpSourceGenerator.KeyColumn.EVENT_ID, 50);
           upsertMeetupRsvpProvider.run();
           Runtime.getRuntime().addShutdownHook(new Thread(() -> {
             try {
@@ -504,7 +504,7 @@ public abstract class QuickStartBase {
           printStatus(Quickstart.Color.CYAN,
               "***** Starting upsertPartialMeetupRSVPEvents data stream and 
publishing to Kafka *****");
           MeetupRsvpStream upsertPartialMeetupRsvpProvider =
-              new MeetupRsvpStream("upsertPartialMeetupRSVPEvents", 
RsvpSourceGenerator.KeyColumn.EVENT_ID);
+              new MeetupRsvpStream("upsertPartialMeetupRSVPEvents", 
RsvpSourceGenerator.KeyColumn.EVENT_ID, 50);
           upsertPartialMeetupRsvpProvider.run();
           Runtime.getRuntime().addShutdownHook(new Thread(() -> {
             try {
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
index 527ca283fb..688e1bd39b 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
@@ -58,6 +58,11 @@ public class MeetupRsvpStream {
 
   public MeetupRsvpStream(String topicName, RsvpSourceGenerator.KeyColumn 
keyColumn)
       throws Exception {
+    this(topicName, keyColumn, 0);
+  }
+
+  public MeetupRsvpStream(String topicName, RsvpSourceGenerator.KeyColumn 
keyColumn, int nullProbability)
+      throws Exception {
     _topicName = topicName;
     Properties properties = new Properties();
     properties.put("metadata.broker.list", 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
@@ -66,7 +71,8 @@ public class MeetupRsvpStream {
     StreamDataProducer producer =
         
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
 properties);
     _pinotRealtimeSource =
-        PinotRealtimeSource.builder().setGenerator(new 
RsvpSourceGenerator(keyColumn)).setProducer(producer)
+        PinotRealtimeSource.builder().setGenerator(new 
RsvpSourceGenerator(keyColumn, nullProbability))
+            .setProducer(producer)
             .setRateLimiter(permits -> {
               int delay = (int) 
(Math.log(ThreadLocalRandom.current().nextDouble()) / Math.log(0.999)) + 1;
               try {
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
index a1474d18f5..74c57c86b9 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
@@ -39,13 +39,15 @@ import static java.nio.charset.StandardCharsets.UTF_8;
  * A simple random generator that fakes RSVP
  */
 public class RsvpSourceGenerator implements PinotSourceDataGenerator {
+  private final int _nullProbability;
   private final KeyColumn _keyColumn;
   public static final DateTimeFormatter DATE_TIME_FORMATTER =
       new 
DateTimeFormatterBuilder().parseCaseInsensitive().append(DateTimeFormatter.ISO_LOCAL_DATE).appendLiteral('
 ')
           .append(DateTimeFormatter.ISO_LOCAL_TIME).toFormatter();
 
-  public RsvpSourceGenerator(KeyColumn keyColumn) {
+  public RsvpSourceGenerator(KeyColumn keyColumn, int nullProbability) {
     _keyColumn = keyColumn;
+    _nullProbability = nullProbability;
   }
 
   public RSVP createMessage() {
@@ -67,7 +69,8 @@ public class RsvpSourceGenerator implements 
PinotSourceDataGenerator {
     json.put("event_name", eventName);
 
     json.put("event_id", eventId);
-    json.put("event_time", 
DATE_TIME_FORMATTER.format(LocalDateTime.now().plusDays(10)));
+    boolean isNull = ThreadLocalRandom.current().nextInt(100) < 
_nullProbability;
+    json.put("event_time", isNull ? null : 
DATE_TIME_FORMATTER.format(LocalDateTime.now().plusDays(10)));
 
     ArrayNode groupTopicsJson = JsonUtils.newArrayNode();
     groupJson.set("group_topics", groupTopicsJson);
diff --git 
a/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
index 6d6c95379b..bbf4e7473c 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
@@ -16,7 +16,8 @@
           "numPartitions": 2
         }
       }
-    }
+    },
+    "nullHandlingEnabled": false
   },
   "instanceAssignmentConfigMap": {
     "CONSUMING": {
diff --git 
a/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_schema.json
 
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_schema.json
index bb69cc2093..6b56952a13 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_schema.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_schema.json
@@ -63,5 +63,6 @@
   "schemaName": "upsertMeetupRsvp",
   "primaryKeyColumns": [
     "event_id"
-  ]
+  ],
+  "enableColumnBasedNullHandling": true
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to