Jackie-Jiang commented on code in PR #10703:
URL: https://github.com/apache/pinot/pull/10703#discussion_r1239069978
##########
pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java:
##########
@@ -396,11 +399,15 @@ protected TableConfig createRealtimeTableConfig(File
sampleAvroFile) {
/**
* Creates a new Upsert enabled table config.
*/
- protected TableConfig createUpsertTableConfig(File sampleAvroFile, String
primaryKeyColumn, int numPartitions) {
+ protected TableConfig createUpsertTableConfig(File sampleAvroFile, String
primaryKeyColumn, String deletedColumn,
Review Comment:
```suggestion
protected TableConfig createUpsertTableConfig(File sampleAvroFile, String
primaryKeyColumn, String deleteColumn,
```
##########
pinot-integration-tests/src/test/resources/partial_upsert_table_test.schema:
##########
@@ -0,0 +1,35 @@
+{
+ "schemaName": "playerScoresPartialUpsert",
+ "dimensionFieldSpecs": [
+ {
+ "name": "playerId",
+ "dataType": "INT"
+ },
+ {
+ "name": "name",
+ "dataType": "STRING"
+ },
+ {
+ "name": "game",
+ "dataType": "STRING",
+ "singleValueField": "false"
+ },
+ {
+ "name": "deleted",
+ "dataType": "BOOLEAN"
+ }
+ ],
+ "metricFieldSpecs": [
+ {
+ "name": "score",
+ "dataType": "FLOAT"
+ }
+ ],
+ "dateTimeFieldSpecs": [{
+ "name": "timestampInEpoch",
+ "dataType": "LONG",
+ "format" : "1:MILLISECONDS:EPOCH",
+ "granularity": "1:MILLISECONDS"
+ }],
+ "primaryKeyColumns": [ "playerId" ]
+}
Review Comment:
Empty line after
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java:
##########
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.client.ResultSet;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Input data - Scores of players
+ * Schema
+ * - Dimension fields: playerId:int (primary key), name:string, game:string,
deleted:boolean
+ * - Metric fields: score:float
+ * - DataTime fields: timestampInEpoch:long
+ */
+public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet {
+ private static final String INPUT_DATA_TAR_FILE = "gameScores_csv.tar.gz";
+ private static final String CSV_SCHEMA_HEADER =
"playerId,name,game,score,timestampInEpoch,deleted";
+ private static final String PARTIAL_UPSERT_TABLE_SCHEMA =
"partial_upsert_table_test.schema";
+ private static final String CSV_DELIMITER = ",";
+ private static final String TABLE_NAME = "gameScores";
+ private static final int NUM_SERVERS = 2;
+ private static final String PRIMARY_KEY_COL = "playerId";
+ protected static final String DELETED_COL = "deleted";
+ @BeforeClass
Review Comment:
Empty line before
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -221,18 +256,30 @@ protected void doAddRecord(MutableSegment segment,
RecordInfo recordInfo) {
protected GenericRow doUpdateRecord(GenericRow record, RecordInfo
recordInfo) {
assert _partialUpsertHandler != null;
AtomicReference<GenericRow> previousRecordReference = new
AtomicReference<>();
+ AtomicBoolean outOfOrder = new AtomicBoolean();
RecordLocation currentRecordLocation =
_primaryKeyToRecordLocationMap.computeIfPresent(
HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
(pk, recordLocation) -> {
if
(recordInfo.getComparisonValue().compareTo(recordLocation.getComparisonValue())
>= 0) {
- _reuse.clear();
-
previousRecordReference.set(recordLocation.getSegment().getRecord(recordLocation.getDocId(),
_reuse));
+ IndexSegment currentSegment = recordLocation.getSegment();
+ int currentDocId = recordLocation.getDocId();
+ ThreadSafeMutableRoaringBitmap currentQueryableDocIds =
currentSegment.getQueryableDocIds();
+ if (currentQueryableDocIds == null ||
currentQueryableDocIds.contains(currentDocId)) {
+ // if delete is not enabled or previous record not marked as
deleted
+ _reuse.clear();
+
previousRecordReference.set(currentSegment.getRecord(currentDocId, _reuse));
+ }
+ } else {
+ outOfOrder.set(true);
}
return recordLocation;
});
+ GenericRow previousRecord = previousRecordReference.get();
if (currentRecordLocation != null) {
// Existing primary key
- GenericRow previousRecord = previousRecordReference.get();
- if (previousRecord != null) {
+ if (!outOfOrder.get()) {
+ if (recordInfo.isDeleteRecord() || previousRecord == null) {
Review Comment:
We can simplify this check to `if (previousRecord == null)` after applying
the check when reading the previous record
##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java:
##########
@@ -1521,6 +1522,43 @@ public void testValidateUpsertConfig() {
Assert.assertEquals(e.getMessage(),
"Metrics aggregation cannot be enabled in the Indexing Config and
Ingestion Config at the same time");
}
+
+ // Table upsert with delete column
+ String incorrectTypeDelCol = "incorrectTypeDeleteCol";
+ String delCol = "myDelCol";
+ schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+ .setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
+ .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ .addSingleValueDimension(incorrectTypeDelCol,
FieldSpec.DataType.STRING)
+ .addSingleValueDimension(delCol, FieldSpec.DataType.BOOLEAN)
+ .build();
+ streamConfigs = getStreamConfigs();
+ streamConfigs.put("stream.kafka.consumer.type", "simple");
+
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setDeleteRecordColumn(incorrectTypeDelCol);
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig)
+ .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .build();
+ try {
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+ Assert.fail("Invalid delete column type (string) should have failed
table creation");
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(), "The deleted record column must be a
single-valued BOOLEAN column");
Review Comment:
```suggestion
Assert.assertEquals(e.getMessage(), "The delete record column must be
a single-valued BOOLEAN column");
```
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java:
##########
@@ -74,7 +74,7 @@ public void setUp()
// Create and upload schema and table config
Schema schema = createSchema();
addSchema(schema);
- TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0),
PRIMARY_KEY_COL, getNumKafkaPartitions());
+ TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0),
PRIMARY_KEY_COL, null, getNumKafkaPartitions());
Review Comment:
This test doesn't need large data as well. It only verifies the upsert
handling for uploaded segments. We can do it in a separate PR
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -221,18 +256,30 @@ protected void doAddRecord(MutableSegment segment,
RecordInfo recordInfo) {
protected GenericRow doUpdateRecord(GenericRow record, RecordInfo
recordInfo) {
assert _partialUpsertHandler != null;
AtomicReference<GenericRow> previousRecordReference = new
AtomicReference<>();
+ AtomicBoolean outOfOrder = new AtomicBoolean();
RecordLocation currentRecordLocation =
_primaryKeyToRecordLocationMap.computeIfPresent(
HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
(pk, recordLocation) -> {
if
(recordInfo.getComparisonValue().compareTo(recordLocation.getComparisonValue())
>= 0) {
- _reuse.clear();
-
previousRecordReference.set(recordLocation.getSegment().getRecord(recordLocation.getDocId(),
_reuse));
+ IndexSegment currentSegment = recordLocation.getSegment();
Review Comment:
Add an if check of `if (!recordInfo.isDeleteRecord())`. No need to read
previous record when current record is delete
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -221,18 +256,30 @@ protected void doAddRecord(MutableSegment segment,
RecordInfo recordInfo) {
protected GenericRow doUpdateRecord(GenericRow record, RecordInfo
recordInfo) {
assert _partialUpsertHandler != null;
AtomicReference<GenericRow> previousRecordReference = new
AtomicReference<>();
+ AtomicBoolean outOfOrder = new AtomicBoolean();
RecordLocation currentRecordLocation =
_primaryKeyToRecordLocationMap.computeIfPresent(
HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
(pk, recordLocation) -> {
if
(recordInfo.getComparisonValue().compareTo(recordLocation.getComparisonValue())
>= 0) {
- _reuse.clear();
-
previousRecordReference.set(recordLocation.getSegment().getRecord(recordLocation.getDocId(),
_reuse));
+ IndexSegment currentSegment = recordLocation.getSegment();
+ int currentDocId = recordLocation.getDocId();
+ ThreadSafeMutableRoaringBitmap currentQueryableDocIds =
currentSegment.getQueryableDocIds();
+ if (currentQueryableDocIds == null ||
currentQueryableDocIds.contains(currentDocId)) {
+ // if delete is not enabled or previous record not marked as
deleted
+ _reuse.clear();
+
previousRecordReference.set(currentSegment.getRecord(currentDocId, _reuse));
+ }
+ } else {
+ outOfOrder.set(true);
}
return recordLocation;
});
+ GenericRow previousRecord = previousRecordReference.get();
Review Comment:
Move this into the if check (revert the change)
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java:
##########
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.client.ResultSet;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Input data - Scores of players
+ * Schema
+ * - Dimension fields: playerId:int (primary key), name:string, game:string,
deleted:boolean
+ * - Metric fields: score:float
+ * - DataTime fields: timestampInEpoch:long
+ */
+public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet {
+ private static final String INPUT_DATA_TAR_FILE = "gameScores_csv.tar.gz";
+ private static final String CSV_SCHEMA_HEADER =
"playerId,name,game,score,timestampInEpoch,deleted";
+ private static final String PARTIAL_UPSERT_TABLE_SCHEMA =
"partial_upsert_table_test.schema";
+ private static final String CSV_DELIMITER = ",";
+ private static final String TABLE_NAME = "gameScores";
+ private static final int NUM_SERVERS = 2;
+ private static final String PRIMARY_KEY_COL = "playerId";
+ protected static final String DELETED_COL = "deleted";
Review Comment:
```suggestion
protected static final String DELETE_COL = "deleted";
```
##########
pinot-integration-tests/src/test/resources/upsert_table_test.schema:
##########
@@ -1,33 +1,34 @@
{
+ "schemaName": "playerScores",
"dimensionFieldSpecs": [
{
- "dataType": "INT",
- "singleValueField": true,
- "name": "clientId"
+ "name": "playerId",
+ "dataType": "INT"
},
{
- "dataType": "STRING",
- "singleValueField": true,
- "name": "city"
+ "name": "name",
+ "dataType": "STRING"
},
{
- "dataType": "STRING",
- "singleValueField": true,
- "name": "description"
+ "name": "game",
+ "dataType": "STRING"
},
{
- "dataType": "INT",
- "singleValueField": true,
- "name": "salary"
+ "name": "deleted",
+ "dataType": "BOOLEAN"
}
],
- "timeFieldSpec": {
- "incomingGranularitySpec": {
- "timeType": "DAYS",
- "dataType": "INT",
- "name": "DaysSinceEpoch"
+ "metricFieldSpecs": [
+ {
+ "name": "score",
+ "dataType": "FLOAT"
}
- },
- "primaryKeyColumns": ["clientId"],
- "schemaName": "upsertSchema"
-}
+ ],
+ "dateTimeFieldSpecs": [{
+ "name": "timestampInEpoch",
+ "dataType": "LONG",
+ "format" : "1:MILLISECONDS:EPOCH",
+ "granularity": "1:MILLISECONDS"
+ }],
+ "primaryKeyColumns": [ "playerId" ]
+}
Review Comment:
Empty line after
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -570,20 +570,29 @@ &&
isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()),
"Upsert/Dedup table must use strict replica-group (i.e.
strictReplicaGroup) based routing");
// specifically for upsert
- if (tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE) {
-
+ UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+ if (upsertConfig != null) {
// no startree index
Preconditions.checkState(CollectionUtils.isEmpty(tableConfig.getIndexingConfig().getStarTreeIndexConfigs())
&& !tableConfig.getIndexingConfig().isEnableDefaultStarTree(),
"The upsert table cannot have star-tree index.");
// comparison column exists
- if (tableConfig.getUpsertConfig().getComparisonColumns() != null) {
- List<String> comparisonCols =
tableConfig.getUpsertConfig().getComparisonColumns();
- for (String comparisonCol : comparisonCols) {
- Preconditions.checkState(schema.hasColumn(comparisonCol), "The
comparison column does not exist on schema");
+ List<String> comparisonColumns = upsertConfig.getComparisonColumns();
+ if (comparisonColumns != null) {
+ for (String column : comparisonColumns) {
+ Preconditions.checkState(schema.hasColumn(column), "The comparison
column does not exist on schema");
}
}
+
+ // Delete record column exist and is a BOOLEAN field
+ String deleteRecordColumn = upsertConfig.getDeleteRecordColumn();
+ if (deleteRecordColumn != null) {
+ FieldSpec fieldSpec = schema.getFieldSpecFor(deleteRecordColumn);
+ Preconditions.checkState(
+ fieldSpec != null && fieldSpec.isSingleValueField() &&
fieldSpec.getDataType() == DataType.BOOLEAN,
+ "The deleted record column must be a single-valued BOOLEAN
column");
Review Comment:
```suggestion
"The delete record column must be a single-valued BOOLEAN
column");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]