This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new dfc2cd06d8 RealtimeSegmentConverter was using incorrect schema (#13877)
dfc2cd06d8 is described below
commit dfc2cd06d86f60a24f5facf2f02e3bd37ae0efc5
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Wed Sep 18 10:30:40 2024 +0200
RealtimeSegmentConverter was using incorrect schema (#13877)
Change the way scheam is copied in RealtimeSegmentConverter to keep column
base nullability
---
.../converter/RealtimeSegmentConverter.java | 9 +---
.../converter/RealtimeSegmentConverterTest.java | 28 +++++++++++--
.../java/org/apache/pinot/spi/data/Schema.java | 38 +++++++++++++++++
.../java/org/apache/pinot/spi/data/SchemaTest.java | 48 ++++++++++++++++++++++
.../org/apache/pinot/tools/QuickStartBase.java | 4 +-
.../pinot/tools/streams/MeetupRsvpStream.java | 8 +++-
.../pinot/tools/streams/RsvpSourceGenerator.java | 7 +++-
.../upsertMeetupRsvp_realtime_table_config.json | 3 +-
.../upsertMeetupRsvp/upsertMeetupRsvp_schema.json | 3 +-
9 files changed, 129 insertions(+), 19 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
index 977bba0451..2082f35622 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
@@ -40,7 +40,6 @@ import org.apache.pinot.spi.config.table.IndexConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
@@ -164,13 +163,7 @@ public class RealtimeSegmentConverter {
*/
@VisibleForTesting
public static Schema getUpdatedSchema(Schema original) {
- Schema newSchema = new Schema();
- for (FieldSpec fieldSpec : original.getAllFieldSpecs()) {
- if (!fieldSpec.isVirtualColumn()) {
- newSchema.addField(fieldSpec);
- }
- }
- return newSchema;
+ return original.withoutVirtualColumns();
}
public boolean isColumnMajorEnabled() {
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
index e2dd41dbb9..5e6a2fe555 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
@@ -29,7 +29,9 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -73,6 +75,7 @@ import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
@@ -99,14 +102,31 @@ public class RealtimeSegmentConverterTest {
@Test
public void testNoVirtualColumnsInSchema() {
- Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("col1",
FieldSpec.DataType.STRING)
+ // @formatter:off
+ Schema schema = new Schema.SchemaBuilder()
+ .setSchemaName("someName")
+ .setEnableColumnBasedNullHandling(true)
+ .addSingleValueDimension("col1", FieldSpec.DataType.STRING)
.addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.MILLISECONDS, "col1"),
- new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.DAYS,
"col2")).build();
+ new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.DAYS,
"col2"))
+ .build();
+ // @formatter:on
String segmentName = "segment1";
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(schema,
segmentName);
- assertEquals(schema.getColumnNames().size(), 5);
+ Set<FieldSpec> initialVirtualColumns = getVirtualColumns(schema);
+ assertNotEquals(initialVirtualColumns, Collections.emptySet(), "Initial
virtual columns should not be empty");
Schema newSchema = RealtimeSegmentConverter.getUpdatedSchema(schema);
- assertEquals(newSchema.getColumnNames().size(), 2);
+ Set<FieldSpec> newVirtualColumns = getVirtualColumns(newSchema);
+ assertEquals(newVirtualColumns, Collections.emptySet(), "Virtual columns
should be removed");
+ assertEquals(newSchema.getSchemaName(), schema.getSchemaName(), "Schema
name should be the same");
+ assertEquals(newSchema.isEnableColumnBasedNullHandling(),
schema.isEnableColumnBasedNullHandling(),
+ "Column based null handling should be the same");
+ }
+
+ private Set<FieldSpec> getVirtualColumns(Schema schema) {
+ return schema.getAllFieldSpecs().stream()
+ .filter(FieldSpec::isVirtualColumn)
+ .collect(Collectors.toSet());
}
@Test
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index a7439d5d1f..d154825746 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -38,6 +38,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -73,6 +74,7 @@ public final class Schema implements Serializable {
private final List<ComplexFieldSpec> _complexFieldSpecs = new ArrayList<>();
// names of the columns that used as primary keys
// TODO(yupeng): add validation checks like duplicate columns and use of
time column
+ @Nullable
private List<String> _primaryKeyColumns;
// Json ignored fields
@@ -176,6 +178,7 @@ public final class Schema implements Serializable {
_enableColumnBasedNullHandling = enableColumnBasedNullHandling;
}
+ @Nullable
public List<String> getPrimaryKeyColumns() {
return _primaryKeyColumns;
}
@@ -376,6 +379,34 @@ public final class Schema implements Serializable {
return physicalColumnNames;
}
+ /**
+ * Returns a new schema containing only physical columns.
+ *
+ * All properties but the fields are the same.
+ * All field attributes are a shallow copy without the virtual column.
+ */
+ public Schema withoutVirtualColumns() {
+ Schema newSchema = new Schema();
+ newSchema.setSchemaName(getSchemaName());
+
newSchema.setEnableColumnBasedNullHandling(isEnableColumnBasedNullHandling());
+ List<String> primaryKeyColumns = getPrimaryKeyColumns();
+ if (primaryKeyColumns != null) {
+ newSchema.setPrimaryKeyColumns(primaryKeyColumns.stream()
+ .filter(primaryKey -> {
+ FieldSpec fieldSpec = _fieldSpecMap.get(primaryKey);
+ return fieldSpec != null && !fieldSpec.isVirtualColumn();
+ })
+ .collect(Collectors.toList())
+ );
+ }
+ for (FieldSpec fieldSpec : getAllFieldSpecs()) {
+ if (!fieldSpec.isVirtualColumn()) {
+ newSchema.addField(fieldSpec);
+ }
+ }
+ return newSchema;
+ }
+
/**
* NOTE: The returned FieldSpecs are sorted with the column name
alphabetically.
*/
@@ -840,10 +871,17 @@ public final class Schema implements Serializable {
return result;
}
+ /**
+ * @deprecated this method is not correctly implemented. ie doesn't call
super.clone and does not create a deep copy
+ * of the fieldSpecs.
+ */
+ @Deprecated
+ @Override
public Schema clone() {
Schema cloned = new SchemaBuilder()
.setSchemaName(getSchemaName())
.setPrimaryKeyColumns(getPrimaryKeyColumns())
+ .setEnableColumnBasedNullHandling(isEnableColumnBasedNullHandling())
.build();
getAllFieldSpecs().forEach(fieldSpec -> cloned.addField(fieldSpec));
return cloned;
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/data/SchemaTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/SchemaTest.java
index c4a2aee5be..a70074d08c 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/data/SchemaTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/data/SchemaTest.java
@@ -22,9 +22,11 @@ import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Timestamp;
+import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.data.TimeGranularitySpec.TimeFormat;
import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -646,4 +648,50 @@ public class SchemaTest {
Assert.assertEquals(newSchema.getFieldSpecFor("svBoolean").getDataType(),
FieldSpec.DataType.BOOLEAN);
Assert.assertEquals(newSchema.getFieldSpecFor("svBooleanWithDefault").getDataType(),
FieldSpec.DataType.BOOLEAN);
}
+
+ @Test
+ public void testWithoutVirtualColumns() {
+ DimensionFieldSpec virtualField = new DimensionFieldSpec(
+ CommonConstants.Segment.BuiltInVirtualColumn.DOCID,
FieldSpec.DataType.INT, true, Integer.class);
+
+ Schema schema = new Schema.SchemaBuilder()
+ .setSchemaName("testSchema")
+ .setEnableColumnBasedNullHandling(true)
+ .addMetricField("metric", FieldSpec.DataType.INT)
+ .addField(virtualField)
+ .setPrimaryKeyColumns(Lists.newArrayList("metric"))
+ .build();
+
+ Schema withoutVirtualColumns = schema.withoutVirtualColumns();
+ Assert.assertTrue(withoutVirtualColumns.getAllFieldSpecs().stream()
+ .noneMatch(field ->
field.getName().equals(CommonConstants.Segment.BuiltInVirtualColumn.DOCID)),
+ "Virtual column " + CommonConstants.Segment.BuiltInVirtualColumn.DOCID
+ " should be removed");
+
+ Assert.assertEquals(withoutVirtualColumns.getPrimaryKeyColumns(),
schema.getPrimaryKeyColumns(),
+ "Primary key columns should be equal");
+ Assert.assertNotSame(withoutVirtualColumns.getPrimaryKeyColumns(),
schema.getPrimaryKeyColumns(),
+ "Primary key columns should not be the same object");
+
+ withoutVirtualColumns.addField(virtualField);
+ Assert.assertEquals(withoutVirtualColumns, schema, "After adding the
virtual field both schemas should be equal");
+ }
+
+ @Test
+ public void testWithoutVirtualColumnsRemoveVirtualPrimaryKeys() {
+ DimensionFieldSpec virtualField = new DimensionFieldSpec(
+ CommonConstants.Segment.BuiltInVirtualColumn.DOCID,
FieldSpec.DataType.INT, true, Integer.class);
+
+ Schema schema = new Schema.SchemaBuilder()
+ .setSchemaName("testSchema")
+ .setEnableColumnBasedNullHandling(true)
+ .addMetricField("metric", FieldSpec.DataType.INT)
+ .addField(virtualField)
+ .setPrimaryKeyColumns(Lists.newArrayList("metric",
virtualField.getName()))
+ .build();
+
+ Schema withoutVirtualColumns = schema.withoutVirtualColumns();
+
+ Assert.assertEquals(withoutVirtualColumns.getPrimaryKeyColumns(),
Collections.singletonList("metric"),
+ "Unexpected primary key columns");
+ }
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
index 5aef89db1d..2913345c46 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
@@ -474,7 +474,7 @@ public abstract class QuickStartBase {
printStatus(Quickstart.Color.CYAN,
"***** Starting upsertMeetupRSVPEvents data stream and
publishing to Kafka *****");
MeetupRsvpStream upsertMeetupRsvpProvider =
- new MeetupRsvpStream("upsertMeetupRSVPEvents",
RsvpSourceGenerator.KeyColumn.EVENT_ID);
+ new MeetupRsvpStream("upsertMeetupRSVPEvents",
RsvpSourceGenerator.KeyColumn.EVENT_ID, 50);
upsertMeetupRsvpProvider.run();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
@@ -504,7 +504,7 @@ public abstract class QuickStartBase {
printStatus(Quickstart.Color.CYAN,
"***** Starting upsertPartialMeetupRSVPEvents data stream and
publishing to Kafka *****");
MeetupRsvpStream upsertPartialMeetupRsvpProvider =
- new MeetupRsvpStream("upsertPartialMeetupRSVPEvents",
RsvpSourceGenerator.KeyColumn.EVENT_ID);
+ new MeetupRsvpStream("upsertPartialMeetupRSVPEvents",
RsvpSourceGenerator.KeyColumn.EVENT_ID, 50);
upsertPartialMeetupRsvpProvider.run();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
index 527ca283fb..688e1bd39b 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
@@ -58,6 +58,11 @@ public class MeetupRsvpStream {
public MeetupRsvpStream(String topicName, RsvpSourceGenerator.KeyColumn
keyColumn)
throws Exception {
+ this(topicName, keyColumn, 0);
+ }
+
+ public MeetupRsvpStream(String topicName, RsvpSourceGenerator.KeyColumn
keyColumn, int nullProbability)
+ throws Exception {
_topicName = topicName;
Properties properties = new Properties();
properties.put("metadata.broker.list",
KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
@@ -66,7 +71,8 @@ public class MeetupRsvpStream {
StreamDataProducer producer =
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
properties);
_pinotRealtimeSource =
- PinotRealtimeSource.builder().setGenerator(new
RsvpSourceGenerator(keyColumn)).setProducer(producer)
+ PinotRealtimeSource.builder().setGenerator(new
RsvpSourceGenerator(keyColumn, nullProbability))
+ .setProducer(producer)
.setRateLimiter(permits -> {
int delay = (int)
(Math.log(ThreadLocalRandom.current().nextDouble()) / Math.log(0.999)) + 1;
try {
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
index a1474d18f5..74c57c86b9 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
@@ -39,13 +39,15 @@ import static java.nio.charset.StandardCharsets.UTF_8;
* A simple random generator that fakes RSVP
*/
public class RsvpSourceGenerator implements PinotSourceDataGenerator {
+ private final int _nullProbability;
private final KeyColumn _keyColumn;
public static final DateTimeFormatter DATE_TIME_FORMATTER =
new
DateTimeFormatterBuilder().parseCaseInsensitive().append(DateTimeFormatter.ISO_LOCAL_DATE).appendLiteral('
')
.append(DateTimeFormatter.ISO_LOCAL_TIME).toFormatter();
- public RsvpSourceGenerator(KeyColumn keyColumn) {
+ public RsvpSourceGenerator(KeyColumn keyColumn, int nullProbability) {
_keyColumn = keyColumn;
+ _nullProbability = nullProbability;
}
public RSVP createMessage() {
@@ -67,7 +69,8 @@ public class RsvpSourceGenerator implements
PinotSourceDataGenerator {
json.put("event_name", eventName);
json.put("event_id", eventId);
- json.put("event_time",
DATE_TIME_FORMATTER.format(LocalDateTime.now().plusDays(10)));
+ boolean isNull = ThreadLocalRandom.current().nextInt(100) <
_nullProbability;
+ json.put("event_time", isNull ? null :
DATE_TIME_FORMATTER.format(LocalDateTime.now().plusDays(10)));
ArrayNode groupTopicsJson = JsonUtils.newArrayNode();
groupJson.set("group_topics", groupTopicsJson);
diff --git
a/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
index 6d6c95379b..bbf4e7473c 100644
---
a/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
+++
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
@@ -16,7 +16,8 @@
"numPartitions": 2
}
}
- }
+ },
+ "nullHandlingEnabled": false
},
"instanceAssignmentConfigMap": {
"CONSUMING": {
diff --git
a/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_schema.json
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_schema.json
index bb69cc2093..6b56952a13 100644
---
a/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_schema.json
+++
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_schema.json
@@ -63,5 +63,6 @@
"schemaName": "upsertMeetupRsvp",
"primaryKeyColumns": [
"event_id"
- ]
+ ],
+ "enableColumnBasedNullHandling": true
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]