This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4823802886 Wire soft upsert delete for compaction task (#12330)
4823802886 is described below
commit 48238028869593fd19a6df862c55cf3facf807af
Author: Seunghyun Lee <[email protected]>
AuthorDate: Sun Jan 28 18:47:48 2024 -0800
Wire soft upsert delete for compaction task (#12330)
---
...sertCompactionMinionClusterIntegrationTest.java | 221 ----------------
.../tests/UpsertTableIntegrationTest.java | 289 ++++++++++++++++-----
.../src/test/resources/gameScores_large_csv.tar.gz | Bin 0 -> 12810 bytes
.../test/resources/upsert_compaction_test.tar.gz | Bin 9405 -> 0 bytes
.../RealtimeToOfflineSegmentsTaskGenerator.java | 4 +-
.../UpsertCompactionTaskGenerator.java | 11 +-
.../pinot/server/api/resources/TablesResource.java | 31 ++-
7 files changed, 261 insertions(+), 295 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java
deleted file mode 100644
index 2239f8d4c1..0000000000
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.io.FileUtils;
-import org.apache.helix.task.TaskState;
-import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
-import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
-import org.apache.pinot.core.common.MinionConstants;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableTaskConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.util.TestUtils;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-
-
-public class UpsertCompactionMinionClusterIntegrationTest extends
BaseClusterIntegrationTest {
- protected PinotHelixTaskResourceManager _helixTaskResourceManager;
- protected PinotTaskManager _taskManager;
- private static final String PRIMARY_KEY_COL = "clientId";
- private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME);
- private static List<File> _avroFiles;
- private TableConfig _tableConfig;
- private Schema _schema;
-
- @Override
- protected String getSchemaFileName() {
- return "upsert_upload_segment_test.schema";
- }
-
- @Override
- protected String getAvroTarFileName() {
- return "upsert_compaction_test.tar.gz";
- }
-
- @Override
- protected String getPartitionColumn() {
- return PRIMARY_KEY_COL;
- }
-
- private TableTaskConfig getCompactionTaskConfig() {
- Map<String, String> tableTaskConfigs = new HashMap<>();
-
tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY,
"0d");
-
tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT,
"1");
-
tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT,
"10");
- return new TableTaskConfig(
-
Collections.singletonMap(MinionConstants.UpsertCompactionTask.TASK_TYPE,
tableTaskConfigs));
- }
-
- @BeforeClass
- public void setUp()
- throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
- // Start the Pinot cluster
- startZk();
- startController();
- startBroker();
- startServers(1);
-
- // Unpack the Avro files
- _avroFiles = unpackAvroData(_tempDir);
-
- startKafka();
-
- // Create and upload the schema and table config
- _schema = createSchema();
- addSchema(_schema);
- _tableConfig = createUpsertTableConfig(_avroFiles.get(0), PRIMARY_KEY_COL,
null, getNumKafkaPartitions());
- _tableConfig.setTaskConfig(getCompactionTaskConfig());
- addTableConfig(_tableConfig);
-
- ClusterIntegrationTestUtils.buildSegmentsFromAvro(_avroFiles,
_tableConfig, _schema, 0, _segmentDir, _tarDir);
-
- startMinion();
- _helixTaskResourceManager =
_controllerStarter.getHelixTaskResourceManager();
- _taskManager = _controllerStarter.getTaskManager();
- }
-
- @BeforeMethod
- public void beforeMethod()
- throws Exception {
- // Create and upload segments
- uploadSegments(getTableName(), TableType.REALTIME, _tarDir);
- }
-
- protected void waitForAllDocsLoaded(long timeoutMs, long expectedCount)
- throws Exception {
- TestUtils.waitForCondition(aVoid -> {
- try {
- return getCurrentCountStarResultWithoutUpsert() == expectedCount;
- } catch (Exception e) {
- return null;
- }
- }, 100L, timeoutMs, "Failed to load all documents");
- assertEquals(getCurrentCountStarResult(), getCountStarResult());
- }
-
- private long getCurrentCountStarResultWithoutUpsert() {
- return getPinotConnection().execute("SELECT COUNT(*) FROM " +
getTableName() + " OPTION(skipUpsert=true)")
- .getResultSet(0).getLong(0);
- }
-
- private long getSalary() {
- return getPinotConnection().execute("SELECT salary FROM " + getTableName()
+ " WHERE clientId=100001")
- .getResultSet(0).getLong(0);
- }
-
- @Override
- protected long getCountStarResult() {
- return 3;
- }
-
- @AfterMethod
- public void afterMethod()
- throws Exception {
- String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
-
- // Test dropping all segments one by one
- List<String> segments = listSegments(realtimeTableName);
- assertFalse(segments.isEmpty());
- for (String segment : segments) {
- dropSegment(realtimeTableName, segment);
- }
- // NOTE: There is a delay to remove the segment from property store
- TestUtils.waitForCondition((aVoid) -> {
- try {
- return listSegments(realtimeTableName).isEmpty();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }, 60_000L, "Failed to drop the segments");
- restartServers();
- }
-
- @AfterClass
- public void tearDown()
- throws IOException {
- String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
- dropRealtimeTable(realtimeTableName);
- stopMinion();
- stopServer();
- stopBroker();
- stopController();
- stopKafka();
- stopZk();
- FileUtils.deleteDirectory(_tempDir);
- }
-
- @Test
- public void testCompaction()
- throws Exception {
- waitForAllDocsLoaded(600_000L, 283);
- assertEquals(getSalary(), 9747108);
-
-
assertNotNull(_taskManager.scheduleTasks(REALTIME_TABLE_NAME).get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
- waitForTaskToComplete();
- waitForAllDocsLoaded(600_000L, 3);
- assertEquals(getSalary(), 9747108);
- }
-
- @Test
- public void testCompactionDeletesSegments()
- throws Exception {
- pushAvroIntoKafka(_avroFiles);
- // Wait for all documents loaded
- waitForAllDocsLoaded(600_000L, 566);
- assertEquals(getSalary(), 9747108);
-
-
assertNull(_taskManager.scheduleTasks(REALTIME_TABLE_NAME).get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
- waitForTaskToComplete();
- waitForAllDocsLoaded(600_000L, 283);
- assertEquals(getSalary(), 9747108);
- }
-
- protected void waitForTaskToComplete() {
- TestUtils.waitForCondition(input -> {
- // Check task state
- for (TaskState taskState :
_helixTaskResourceManager.getTaskStates(MinionConstants.UpsertCompactionTask.TASK_TYPE)
- .values()) {
- if (taskState != TaskState.COMPLETED) {
- return false;
- }
- }
- return true;
- }, 600_000L, "Failed to complete task");
- }
-}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
index 43017f29bd..f788508725 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
@@ -33,15 +33,21 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.task.TaskState;
import org.apache.pinot.client.ResultSet;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
+import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
+import
org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
import
org.apache.pinot.integration.tests.models.DummyTableUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
import org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -56,6 +62,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
/**
@@ -66,14 +73,19 @@ import static org.testng.Assert.assertEquals;
* - DataTime fields: timestampInEpoch:long
*/
public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet {
- private static final String INPUT_DATA_TAR_FILE = "gameScores_csv.tar.gz";
+ private static final String INPUT_DATA_SMALL_TAR_FILE =
"gameScores_csv.tar.gz";
+ private static final String INPUT_DATA_LARGE_TAR_FILE =
"gameScores_large_csv.tar.gz";
+
private static final String CSV_SCHEMA_HEADER =
"playerId,name,game,score,timestampInEpoch,deleted";
private static final String PARTIAL_UPSERT_TABLE_SCHEMA =
"partial_upsert_table_test.schema";
private static final String CSV_DELIMITER = ",";
private static final String TABLE_NAME = "gameScores";
private static final int NUM_SERVERS = 2;
private static final String PRIMARY_KEY_COL = "playerId";
- protected static final String DELETE_COL = "deleted";
+ private static final String DELETE_COL = "deleted";
+
+ private PinotTaskManager _taskManager;
+ private PinotHelixTaskResourceManager _helixTaskResourceManager;
@BeforeClass
public void setUp()
@@ -86,29 +98,23 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
startController();
startBroker();
startServers(NUM_SERVERS);
+ startMinion();
// Start Kafka and push data into Kafka
startKafka();
- List<File> unpackDataFiles = unpackTarData(INPUT_DATA_TAR_FILE, _tempDir);
- pushCsvIntoKafka(unpackDataFiles.get(0), getKafkaTopic(), 0); // TODO: Fix
-
- // Create and upload schema and table config
- Schema schema = createSchema();
- addSchema(schema);
-
- Map<String, String> csvDecoderProperties =
getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER);
- TableConfig tableConfig =
- createCSVUpsertTableConfig(getTableName(), getKafkaTopic(),
getNumKafkaPartitions(), csvDecoderProperties, null,
- PRIMARY_KEY_COL);
- addTableConfig(tableConfig);
+ // Push data to Kafka and set up table
+ setupTable(getTableName(), getKafkaTopic(), INPUT_DATA_SMALL_TAR_FILE,
null);
// Wait for all documents loaded
- waitForAllDocsLoaded(600_000L);
+ waitForAllDocsLoaded(60_000L);
+ assertEquals(getCurrentCountStarResult(), getCountStarResult());
// Create partial upsert table schema
Schema partialUpsertSchema = createSchema(PARTIAL_UPSERT_TABLE_SCHEMA);
addSchema(partialUpsertSchema);
+ _taskManager = _controllerStarter.getTaskManager();
+ _helixTaskResourceManager =
_controllerStarter.getHelixTaskResourceManager();
}
@AfterClass
@@ -151,6 +157,15 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
return 3;
}
+ @Override
+ protected int getRealtimeSegmentFlushSize() {
+ return 500;
+ }
+
+ private long getCountStarResultWithoutUpsert() {
+ return 10;
+ }
+
private Schema createSchema(String schemaFileName)
throws IOException {
InputStream inputStream =
BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(schemaFileName);
@@ -163,21 +178,33 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
.getResultSet(0).getLong(0);
}
- private long getCountStarResultWithoutUpsert() {
- return 10;
+ private long queryCountStar(String tableName) {
+ return getPinotConnection().execute("SELECT COUNT(*) FROM " +
tableName).getResultSet(0).getLong(0);
}
@Override
- protected void waitForAllDocsLoaded(long timeoutMs)
- throws Exception {
+ protected void waitForAllDocsLoaded(long timeoutMs) {
+ waitForAllDocsLoaded(getTableName(), timeoutMs,
getCountStarResultWithoutUpsert());
+ }
+
+ private void waitForAllDocsLoaded(String tableName, long timeoutMs, long
expectedCountStarWithoutUpsertResult) {
TestUtils.waitForCondition(aVoid -> {
try {
- return queryCountStarWithoutUpsert(getTableName()) ==
getCountStarResultWithoutUpsert();
+ return queryCountStarWithoutUpsert(tableName) ==
expectedCountStarWithoutUpsertResult;
+ } catch (Exception e) {
+ return null;
+ }
+ }, 100L, timeoutMs, "Failed to load all documents");
+ }
+
+ private void waitForNumQueriedSegmentsToConverge(String tableName, long
timeoutMs, long expectedNumSegmentsQueried) {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ return getNumSegmentsQueried(tableName) == expectedNumSegmentsQueried;
} catch (Exception e) {
return null;
}
}, 100L, timeoutMs, "Failed to load all documents");
- assertEquals(getCurrentCountStarResult(), getCountStarResult());
}
@Test
@@ -192,34 +219,16 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
protected void testDeleteWithFullUpsert(String kafkaTopicName, String
tableName, UpsertConfig upsertConfig)
throws Exception {
// SETUP
- // Create table with delete Record column
- Map<String, String> csvDecoderProperties =
getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER);
- Schema upsertSchema = createSchema();
- upsertSchema.setSchemaName(tableName);
- addSchema(upsertSchema);
- TableConfig tableConfig =
- createCSVUpsertTableConfig(tableName, kafkaTopicName,
getNumKafkaPartitions(), csvDecoderProperties,
- upsertConfig, PRIMARY_KEY_COL);
- addTableConfig(tableConfig);
-
- // Push initial 10 upsert records - 3 pks 100, 101 and 102
- List<File> dataFiles = unpackTarData(INPUT_DATA_TAR_FILE, _tempDir);
- pushCsvIntoKafka(dataFiles.get(0), kafkaTopicName, 0);
+ setupTable(tableName, kafkaTopicName, INPUT_DATA_SMALL_TAR_FILE,
upsertConfig);
// TEST 1: Delete existing primary key
// Push 2 records with deleted = true - deletes pks 100 and 102
- List<String> deleteRecords =
ImmutableList.of("102,Clifford,counter-strike,102,1681054200000,true",
+ List<String> deleteRecords =
ImmutableList.of("102,Clifford,counter-strike,102,1681254200000,true",
"100,Zook,counter-strike,2050,1681377200000,true");
pushCsvIntoKafka(deleteRecords, kafkaTopicName, 0);
// Wait for all docs (12 with skipUpsert=true) to be loaded
- TestUtils.waitForCondition(aVoid -> {
- try {
- return queryCountStarWithoutUpsert(tableName) == 12;
- } catch (Exception e) {
- return null;
- }
- }, 100L, 600_000L, "Failed to load all upsert records for
testDeleteWithFullUpsert");
+ waitForAllDocsLoaded(tableName, 600_000L, 12);
// Query for number of records in the table - should only return 1
ResultSet rs = getPinotConnection().execute("SELECT * FROM " +
tableName).getResultSet(0);
@@ -252,13 +261,7 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
List<String> revivedRecord =
Collections.singletonList("100,Zook-New,,0.0,1684707335000,false");
pushCsvIntoKafka(revivedRecord, kafkaTopicName, 0);
// Wait for the new record (13 with skipUpsert=true) to be indexed
- TestUtils.waitForCondition(aVoid -> {
- try {
- return queryCountStarWithoutUpsert(tableName) == 13;
- } catch (Exception e) {
- return null;
- }
- }, 100L, 600_000L, "Failed to load all upsert records for
testDeleteWithFullUpsert");
+ waitForAllDocsLoaded(tableName, 600_000L, 13);
// Validate: pk is queryable and all columns are overwritten with new value
rs = getPinotConnection().execute("SELECT playerId, name, game FROM " +
tableName + " WHERE playerId = 100")
@@ -278,6 +281,27 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
dropRealtimeTable(tableName);
}
+ private TableConfig setupTable(String tableName, String kafkaTopicName,
String inputDataFile,
+ UpsertConfig upsertConfig)
+ throws Exception {
+ // SETUP
+ // Create table with delete Record column
+ Map<String, String> csvDecoderProperties =
getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER);
+ Schema upsertSchema = createSchema();
+ upsertSchema.setSchemaName(tableName);
+ addSchema(upsertSchema);
+ TableConfig tableConfig =
+ createCSVUpsertTableConfig(tableName, kafkaTopicName,
getNumKafkaPartitions(), csvDecoderProperties,
+ upsertConfig, PRIMARY_KEY_COL);
+ addTableConfig(tableConfig);
+
+ // Push initial 10 upsert records - 3 pks 100, 101 and 102
+ List<File> dataFiles = unpackTarData(inputDataFile, _tempDir);
+ pushCsvIntoKafka(dataFiles.get(0), kafkaTopicName, 0);
+
+ return tableConfig;
+ }
+
@Test
public void testDeleteWithPartialUpsert()
throws Exception {
@@ -319,13 +343,7 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
pushCsvIntoKafka(deleteRecords, kafkaTopicName, 0);
// Wait for all docs (12 with skipUpsert=true) to be loaded
- TestUtils.waitForCondition(aVoid -> {
- try {
- return queryCountStarWithoutUpsert(tableName) == 12;
- } catch (Exception e) {
- return null;
- }
- }, 100L, 600_000L, "Failed to load all upsert records for
testDeleteWithFullUpsert");
+ waitForAllDocsLoaded(tableName, 600_000L, 12);
// Query for number of records in the table - should only return 1
ResultSet rs = getPinotConnection().execute("SELECT * FROM " +
tableName).getResultSet(0);
@@ -358,13 +376,7 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
List<String> revivedRecord =
Collections.singletonList("100,Zook,,0.0,1684707335000,false");
pushCsvIntoKafka(revivedRecord, kafkaTopicName, 0);
// Wait for the new record (13 with skipUpsert=true) to be indexed
- TestUtils.waitForCondition(aVoid -> {
- try {
- return queryCountStarWithoutUpsert(tableName) == 13;
- } catch (Exception e) {
- return null;
- }
- }, 100L, 600_000L, "Failed to load all upsert records for
testDeleteWithFullUpsert");
+ waitForAllDocsLoaded(tableName, 600_000L, 13);
// Validate: pk is queryable and all columns are overwritten with new value
rs = getPinotConnection().execute("SELECT playerId, name, game FROM " +
tableName + " WHERE playerId = 100")
@@ -435,6 +447,157 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
if (serverStarter != null) {
serverStarter.stop();
}
+ // Re-initialize the executor for SegmentBuildTimeLeaseExtender to avoid
the NullPointerException for the other
+ // tests.
+ SegmentBuildTimeLeaseExtender.initExecutor();
}
}
+
+ @Test
+ public void testUpsertCompaction()
+ throws Exception {
+ final UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setDeleteRecordColumn(DELETE_COL);
+ String tableName = "gameScoresWithCompaction";
+ TableConfig tableConfig =
+ setupTable(tableName, getKafkaTopic() + "-with-compaction",
INPUT_DATA_LARGE_TAR_FILE, upsertConfig);
+ tableConfig.setTaskConfig(getCompactionTaskConfig());
+ updateTableConfig(tableConfig);
+ waitForAllDocsLoaded(tableName, 600_000L, 1000);
+ assertEquals(getScore(tableName), 3692);
+ waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 3);
+
+
assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+ .get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
+ waitForTaskToComplete();
+ waitForAllDocsLoaded(tableName, 600_000L, 3);
+ assertEquals(getScore(tableName), 3692);
+ waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 3);
+
+ // TEARDOWN
+ dropRealtimeTable(tableName);
+ }
+
+ @Test
+ public void testUpsertCompactionDeletesSegments()
+ throws Exception {
+ final UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setDeleteRecordColumn(DELETE_COL);
+ String tableName = "gameScoresWithCompactionDeleteSegments";
+ String kafkaTopicName = getKafkaTopic() +
"-with-compaction-segment-delete";
+ TableConfig tableConfig = setupTable(tableName, kafkaTopicName,
INPUT_DATA_LARGE_TAR_FILE, upsertConfig);
+ tableConfig.setTaskConfig(getCompactionTaskConfig());
+ updateTableConfig(tableConfig);
+
+ // Push data one more time
+ List<File> dataFiles = unpackTarData(INPUT_DATA_LARGE_TAR_FILE, _tempDir);
+ pushCsvIntoKafka(dataFiles.get(0), kafkaTopicName, 0);
+ waitForAllDocsLoaded(tableName, 600_000L, 2000);
+ assertEquals(getScore(tableName), 3692);
+ waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 5);
+
+
assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+ .get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
+ waitForTaskToComplete();
+ waitForAllDocsLoaded(tableName, 600_000L, 3);
+ assertEquals(getScore(tableName), 3692);
+ waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 3);
+ // TEARDOWN
+ dropRealtimeTable(tableName);
+ }
+
+ @Test
+ public void testUpsertCompactionWithSoftDelete()
+ throws Exception {
+ final UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setDeleteRecordColumn(DELETE_COL);
+ String tableName = "gameScoresWithCompactionWithSoftDelete";
+ String kafkaTopicName = getKafkaTopic() + "-with-compaction-delete";
+ TableConfig tableConfig = setupTable(tableName, kafkaTopicName,
INPUT_DATA_LARGE_TAR_FILE, upsertConfig);
+ tableConfig.setTaskConfig(getCompactionTaskConfig());
+ updateTableConfig(tableConfig);
+
+ // Push data one more time
+ List<File> dataFiles = unpackTarData(INPUT_DATA_LARGE_TAR_FILE, _tempDir);
+ pushCsvIntoKafka(dataFiles.get(0), kafkaTopicName, 0);
+ waitForAllDocsLoaded(tableName, 600_000L, 2000);
+ assertEquals(getScore(tableName), 3692);
+ waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 5);
+ assertEquals(queryCountStar(tableName), 3);
+
+ // Push data to delete 2 rows
+ List<String> deleteRecords =
ImmutableList.of("102,Clifford,counter-strike,102,1681254200000,true",
+ "100,Zook,counter-strike,2050,1681377200000,true");
+ pushCsvIntoKafka(deleteRecords, kafkaTopicName, 0);
+ waitForAllDocsLoaded(tableName, 600_000L, 2002);
+ assertEquals(queryCountStar(tableName), 1);
+
+ // Run segment compaction. This time, we expect that the deleting rows are
still there because they are
+ // as part of the consuming segment
+
assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+ .get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
+ waitForTaskToComplete();
+ waitForAllDocsLoaded(tableName, 600_000L, 3);
+ assertEquals(getScore(tableName), 3692);
+ waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 2);
+ assertEquals(queryCountStar(tableName), 1);
+
+ // Push data again to flush deleting rows
+ pushCsvIntoKafka(dataFiles.get(0), kafkaTopicName, 0);
+ waitForAllDocsLoaded(tableName, 600_000L, 1003);
+ assertEquals(getScore(tableName), 3692);
+ waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 4);
+ assertEquals(queryCountStar(tableName), 1);
+ assertEquals(getNumDeletedRows(tableName), 2);
+
+ // Run segment compaction. This time, we expect that the deleting rows are
cleaned up
+
assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+ .get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
+ waitForTaskToComplete();
+ waitForAllDocsLoaded(tableName, 600_000L, 3);
+ assertEquals(getScore(tableName), 3692);
+ waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 2);
+ assertEquals(queryCountStar(tableName), 1);
+ assertEquals(getNumDeletedRows(tableName), 0);
+
+ // TEARDOWN
+ dropRealtimeTable(tableName);
+ }
+
+ private long getScore(String tableName) {
+ return (long) getPinotConnection().execute("SELECT score FROM " +
tableName + " WHERE playerId = 101")
+ .getResultSet(0).getFloat(0);
+ }
+
+ private long getNumSegmentsQueried(String tableName) {
+ return getPinotConnection().execute("SELECT COUNT(*) FROM " +
tableName).getExecutionStats()
+ .getNumSegmentsQueried();
+ }
+
+ private long getNumDeletedRows(String tableName) {
+ return getPinotConnection().execute(
+ "SELECT COUNT(*) FROM " + tableName + " WHERE deleted = true
OPTION(skipUpsert=true)").getResultSet(0)
+ .getLong(0);
+ }
+
+ private void waitForTaskToComplete() {
+ TestUtils.waitForCondition(input -> {
+ // Check task state
+ for (TaskState taskState :
_helixTaskResourceManager.getTaskStates(MinionConstants.UpsertCompactionTask.TASK_TYPE)
+ .values()) {
+ if (taskState != TaskState.COMPLETED) {
+ return false;
+ }
+ }
+ return true;
+ }, 600_000L, "Failed to complete task");
+ }
+
+ private TableTaskConfig getCompactionTaskConfig() {
+ Map<String, String> tableTaskConfigs = new HashMap<>();
+
tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY,
"0d");
+
tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT,
"1");
+ return new TableTaskConfig(
+
Collections.singletonMap(MinionConstants.UpsertCompactionTask.TASK_TYPE,
tableTaskConfigs));
+ }
}
diff --git
a/pinot-integration-tests/src/test/resources/gameScores_large_csv.tar.gz
b/pinot-integration-tests/src/test/resources/gameScores_large_csv.tar.gz
new file mode 100644
index 0000000000..e9db06acf7
Binary files /dev/null and
b/pinot-integration-tests/src/test/resources/gameScores_large_csv.tar.gz differ
diff --git
a/pinot-integration-tests/src/test/resources/upsert_compaction_test.tar.gz
b/pinot-integration-tests/src/test/resources/upsert_compaction_test.tar.gz
deleted file mode 100644
index 76b0a50126..0000000000
Binary files
a/pinot-integration-tests/src/test/resources/upsert_compaction_test.tar.gz and
/dev/null differ
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
index c9654dd2a7..2ad9b415d2 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -105,8 +105,8 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends
BaseTaskGenerator {
Map<String, TaskState> incompleteTasks =
TaskGeneratorUtils.getIncompleteTasks(taskType, realtimeTableName,
_clusterInfoAccessor);
if (!incompleteTasks.isEmpty()) {
- LOGGER.warn("Found incomplete tasks: {} for same table: {}. Skipping
task generation.",
- incompleteTasks.keySet(), realtimeTableName);
+ LOGGER.warn("Found incomplete tasks: {} for same table: {} and task
type: {}. Skipping task generation.",
+ incompleteTasks.keySet(), realtimeTableName, taskType);
continue;
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index e6eaf3679e..210e892434 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -26,11 +26,13 @@ import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.helix.task.TaskState;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.restlet.resources.ValidDocIdMetadataInfo;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
+import
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
@@ -97,7 +99,14 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
continue;
}
- // TODO: add a check to see if the task is already running for the table
+ // Only schedule 1 task of this type, per table
+ Map<String, TaskState> incompleteTasks =
+ TaskGeneratorUtils.getIncompleteTasks(taskType, tableNameWithType,
_clusterInfoAccessor);
+ if (!incompleteTasks.isEmpty()) {
+ LOGGER.warn("Found incomplete tasks: {} for same table: {} and task
type: {}. Skipping task generation.",
+ incompleteTasks.keySet(), tableNameWithType, taskType);
+ continue;
+ }
// get server to segment mappings
PinotHelixResourceManager pinotHelixResourceManager =
_clusterInfoAccessor.getPinotHelixResourceManager();
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 29f90715da..20bba92ffb 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -500,15 +500,22 @@ public class TablesResource {
String.format("Table %s segment %s is not a immutable segment",
tableNameWithType, segmentName),
Response.Status.BAD_REQUEST);
}
- MutableRoaringBitmap validDocIds =
- indexSegment.getValidDocIds() != null ?
indexSegment.getValidDocIds().getMutableRoaringBitmap() : null;
- if (validDocIds == null) {
+
+ // Adopt the same logic as the query execution to get the valid doc ids.
'FilterPlanNode.run()'
+ // If the queryableDocId is available (upsert delete is enabled), we
read the valid doc ids from it.
+ // Otherwise, we read the valid doc ids.
+ final MutableRoaringBitmap validDocIdSnapshot;
+ if (indexSegment.getQueryableDocIds() != null) {
+ validDocIdSnapshot =
indexSegment.getQueryableDocIds().getMutableRoaringBitmap();
+ } else if (indexSegment.getValidDocIds() != null) {
+ validDocIdSnapshot =
indexSegment.getValidDocIds().getMutableRoaringBitmap();
+ } else {
throw new WebApplicationException(
String.format("Missing validDocIds for table %s segment %s does
not exist", tableNameWithType, segmentName),
Response.Status.NOT_FOUND);
}
- byte[] validDocIdsBytes = RoaringBitmapUtils.serialize(validDocIds);
+ byte[] validDocIdsBytes =
RoaringBitmapUtils.serialize(validDocIdSnapshot);
Response.ResponseBuilder builder = Response.ok(validDocIdsBytes);
builder.header(HttpHeaders.CONTENT_LENGTH, validDocIdsBytes.length);
return builder.build();
@@ -580,17 +587,25 @@ public class TablesResource {
LOGGER.warn(msg);
continue;
}
- MutableRoaringBitmap validDocIds =
- indexSegment.getValidDocIds() != null ?
indexSegment.getValidDocIds().getMutableRoaringBitmap() : null;
- if (validDocIds == null) {
+
+ // Adopt the same logic as the query execution to get the valid doc
ids. 'FilterPlanNode.run()'
+ // If the queryableDocId is available (upsert delete is enabled), we
read the valid doc ids from it.
+ // Otherwise, we read the valid doc ids.
+ final MutableRoaringBitmap validDocIdSnapshot;
+ if (indexSegment.getQueryableDocIds() != null) {
+ validDocIdSnapshot =
indexSegment.getQueryableDocIds().getMutableRoaringBitmap();
+ } else if (indexSegment.getValidDocIds() != null) {
+ validDocIdSnapshot =
indexSegment.getValidDocIds().getMutableRoaringBitmap();
+ } else {
String msg = String.format("Missing validDocIds for table %s segment
%s does not exist", tableNameWithType,
segmentDataManager.getSegmentName());
LOGGER.warn(msg);
throw new WebApplicationException(msg, Response.Status.NOT_FOUND);
}
+
Map<String, Object> validDocIdMetadata = new HashMap<>();
int totalDocs = indexSegment.getSegmentMetadata().getTotalDocs();
- int totalValidDocs = validDocIds.getCardinality();
+ int totalValidDocs = validDocIdSnapshot.getCardinality();
int totalInvalidDocs = totalDocs - totalValidDocs;
validDocIdMetadata.put("segmentName",
segmentDataManager.getSegmentName());
validDocIdMetadata.put("totalDocs", totalDocs);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]