This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4823802886 Wire soft upsert delete for compaction task (#12330)
4823802886 is described below

commit 48238028869593fd19a6df862c55cf3facf807af
Author: Seunghyun Lee <[email protected]>
AuthorDate: Sun Jan 28 18:47:48 2024 -0800

    Wire soft upsert delete for compaction task (#12330)
---
 ...sertCompactionMinionClusterIntegrationTest.java | 221 ----------------
 .../tests/UpsertTableIntegrationTest.java          | 289 ++++++++++++++++-----
 .../src/test/resources/gameScores_large_csv.tar.gz | Bin 0 -> 12810 bytes
 .../test/resources/upsert_compaction_test.tar.gz   | Bin 9405 -> 0 bytes
 .../RealtimeToOfflineSegmentsTaskGenerator.java    |   4 +-
 .../UpsertCompactionTaskGenerator.java             |  11 +-
 .../pinot/server/api/resources/TablesResource.java |  31 ++-
 7 files changed, 261 insertions(+), 295 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java
deleted file mode 100644
index 2239f8d4c1..0000000000
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.io.FileUtils;
-import org.apache.helix.task.TaskState;
-import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
-import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
-import org.apache.pinot.core.common.MinionConstants;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableTaskConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.util.TestUtils;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-
-
-public class UpsertCompactionMinionClusterIntegrationTest extends 
BaseClusterIntegrationTest {
-  protected PinotHelixTaskResourceManager _helixTaskResourceManager;
-  protected PinotTaskManager _taskManager;
-  private static final String PRIMARY_KEY_COL = "clientId";
-  private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME);
-  private static List<File> _avroFiles;
-  private TableConfig _tableConfig;
-  private Schema _schema;
-
-  @Override
-  protected String getSchemaFileName() {
-    return "upsert_upload_segment_test.schema";
-  }
-
-  @Override
-  protected String getAvroTarFileName() {
-    return "upsert_compaction_test.tar.gz";
-  }
-
-  @Override
-  protected String getPartitionColumn() {
-    return PRIMARY_KEY_COL;
-  }
-
-  private TableTaskConfig getCompactionTaskConfig() {
-    Map<String, String> tableTaskConfigs = new HashMap<>();
-    
tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY,
 "0d");
-    
tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT,
 "1");
-    
tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT,
 "10");
-    return new TableTaskConfig(
-        
Collections.singletonMap(MinionConstants.UpsertCompactionTask.TASK_TYPE, 
tableTaskConfigs));
-  }
-
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
-    // Start the Pinot cluster
-    startZk();
-    startController();
-    startBroker();
-    startServers(1);
-
-    // Unpack the Avro files
-    _avroFiles = unpackAvroData(_tempDir);
-
-    startKafka();
-
-    // Create and upload the schema and table config
-    _schema = createSchema();
-    addSchema(_schema);
-    _tableConfig = createUpsertTableConfig(_avroFiles.get(0), PRIMARY_KEY_COL, 
null, getNumKafkaPartitions());
-    _tableConfig.setTaskConfig(getCompactionTaskConfig());
-    addTableConfig(_tableConfig);
-
-    ClusterIntegrationTestUtils.buildSegmentsFromAvro(_avroFiles, 
_tableConfig, _schema, 0, _segmentDir, _tarDir);
-
-    startMinion();
-    _helixTaskResourceManager = 
_controllerStarter.getHelixTaskResourceManager();
-    _taskManager = _controllerStarter.getTaskManager();
-  }
-
-  @BeforeMethod
-  public void beforeMethod()
-      throws Exception {
-    // Create and upload segments
-    uploadSegments(getTableName(), TableType.REALTIME, _tarDir);
-  }
-
-  protected void waitForAllDocsLoaded(long timeoutMs, long expectedCount)
-      throws Exception {
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        return getCurrentCountStarResultWithoutUpsert() == expectedCount;
-      } catch (Exception e) {
-        return null;
-      }
-    }, 100L, timeoutMs, "Failed to load all documents");
-    assertEquals(getCurrentCountStarResult(), getCountStarResult());
-  }
-
-  private long getCurrentCountStarResultWithoutUpsert() {
-    return getPinotConnection().execute("SELECT COUNT(*) FROM " + 
getTableName() + " OPTION(skipUpsert=true)")
-        .getResultSet(0).getLong(0);
-  }
-
-  private long getSalary() {
-    return getPinotConnection().execute("SELECT salary FROM " + getTableName() 
+ " WHERE clientId=100001")
-        .getResultSet(0).getLong(0);
-  }
-
-  @Override
-  protected long getCountStarResult() {
-    return 3;
-  }
-
-  @AfterMethod
-  public void afterMethod()
-      throws Exception {
-    String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
-
-    // Test dropping all segments one by one
-    List<String> segments = listSegments(realtimeTableName);
-    assertFalse(segments.isEmpty());
-    for (String segment : segments) {
-      dropSegment(realtimeTableName, segment);
-    }
-    // NOTE: There is a delay to remove the segment from property store
-    TestUtils.waitForCondition((aVoid) -> {
-      try {
-        return listSegments(realtimeTableName).isEmpty();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }, 60_000L, "Failed to drop the segments");
-    restartServers();
-  }
-
-  @AfterClass
-  public void tearDown()
-      throws IOException {
-    String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
-    dropRealtimeTable(realtimeTableName);
-    stopMinion();
-    stopServer();
-    stopBroker();
-    stopController();
-    stopKafka();
-    stopZk();
-    FileUtils.deleteDirectory(_tempDir);
-  }
-
-  @Test
-  public void testCompaction()
-      throws Exception {
-    waitForAllDocsLoaded(600_000L, 283);
-    assertEquals(getSalary(), 9747108);
-
-    
assertNotNull(_taskManager.scheduleTasks(REALTIME_TABLE_NAME).get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
-    waitForTaskToComplete();
-    waitForAllDocsLoaded(600_000L, 3);
-    assertEquals(getSalary(), 9747108);
-  }
-
-  @Test
-  public void testCompactionDeletesSegments()
-      throws Exception {
-    pushAvroIntoKafka(_avroFiles);
-    // Wait for all documents loaded
-    waitForAllDocsLoaded(600_000L, 566);
-    assertEquals(getSalary(), 9747108);
-
-    
assertNull(_taskManager.scheduleTasks(REALTIME_TABLE_NAME).get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
-    waitForTaskToComplete();
-    waitForAllDocsLoaded(600_000L, 283);
-    assertEquals(getSalary(), 9747108);
-  }
-
-  protected void waitForTaskToComplete() {
-    TestUtils.waitForCondition(input -> {
-      // Check task state
-      for (TaskState taskState : 
_helixTaskResourceManager.getTaskStates(MinionConstants.UpsertCompactionTask.TASK_TYPE)
-          .values()) {
-        if (taskState != TaskState.COMPLETED) {
-          return false;
-        }
-      }
-      return true;
-    }, 600_000L, "Failed to complete task");
-  }
-}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
index 43017f29bd..f788508725 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
@@ -33,15 +33,21 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.task.TaskState;
 import org.apache.pinot.client.ResultSet;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
+import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
+import 
org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
 import 
org.apache.pinot.integration.tests.models.DummyTableUpsertMetadataManager;
 import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
 import org.apache.pinot.server.starter.helix.BaseServerStarter;
 import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TenantConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -56,6 +62,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 
 
 /**
@@ -66,14 +73,19 @@ import static org.testng.Assert.assertEquals;
  *  - DataTime fields: timestampInEpoch:long
  */
 public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet {
-  private static final String INPUT_DATA_TAR_FILE = "gameScores_csv.tar.gz";
+  private static final String INPUT_DATA_SMALL_TAR_FILE = 
"gameScores_csv.tar.gz";
+  private static final String INPUT_DATA_LARGE_TAR_FILE = 
"gameScores_large_csv.tar.gz";
+
   private static final String CSV_SCHEMA_HEADER = 
"playerId,name,game,score,timestampInEpoch,deleted";
   private static final String PARTIAL_UPSERT_TABLE_SCHEMA = 
"partial_upsert_table_test.schema";
   private static final String CSV_DELIMITER = ",";
   private static final String TABLE_NAME = "gameScores";
   private static final int NUM_SERVERS = 2;
   private static final String PRIMARY_KEY_COL = "playerId";
-  protected static final String DELETE_COL = "deleted";
+  private static final String DELETE_COL = "deleted";
+
+  private PinotTaskManager _taskManager;
+  private PinotHelixTaskResourceManager _helixTaskResourceManager;
 
   @BeforeClass
   public void setUp()
@@ -86,29 +98,23 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
     startController();
     startBroker();
     startServers(NUM_SERVERS);
+    startMinion();
 
     // Start Kafka and push data into Kafka
     startKafka();
 
-    List<File> unpackDataFiles = unpackTarData(INPUT_DATA_TAR_FILE, _tempDir);
-    pushCsvIntoKafka(unpackDataFiles.get(0), getKafkaTopic(), 0);  // TODO: Fix
-
-    // Create and upload schema and table config
-    Schema schema = createSchema();
-    addSchema(schema);
-
-    Map<String, String> csvDecoderProperties = 
getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER);
-    TableConfig tableConfig =
-        createCSVUpsertTableConfig(getTableName(), getKafkaTopic(), 
getNumKafkaPartitions(), csvDecoderProperties, null,
-            PRIMARY_KEY_COL);
-    addTableConfig(tableConfig);
+    // Push data to Kafka and set up table
+    setupTable(getTableName(), getKafkaTopic(), INPUT_DATA_SMALL_TAR_FILE, 
null);
 
     // Wait for all documents loaded
-    waitForAllDocsLoaded(600_000L);
+    waitForAllDocsLoaded(60_000L);
+    assertEquals(getCurrentCountStarResult(), getCountStarResult());
 
     // Create partial upsert table schema
     Schema partialUpsertSchema = createSchema(PARTIAL_UPSERT_TABLE_SCHEMA);
     addSchema(partialUpsertSchema);
+    _taskManager = _controllerStarter.getTaskManager();
+    _helixTaskResourceManager = 
_controllerStarter.getHelixTaskResourceManager();
   }
 
   @AfterClass
@@ -151,6 +157,15 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
     return 3;
   }
 
+  @Override
+  protected int getRealtimeSegmentFlushSize() {
+    return 500;
+  }
+
+  private long getCountStarResultWithoutUpsert() {
+    return 10;
+  }
+
   private Schema createSchema(String schemaFileName)
       throws IOException {
     InputStream inputStream = 
BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(schemaFileName);
@@ -163,21 +178,33 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
         .getResultSet(0).getLong(0);
   }
 
-  private long getCountStarResultWithoutUpsert() {
-    return 10;
+  private long queryCountStar(String tableName) {
+    return getPinotConnection().execute("SELECT COUNT(*) FROM " + 
tableName).getResultSet(0).getLong(0);
   }
 
   @Override
-  protected void waitForAllDocsLoaded(long timeoutMs)
-      throws Exception {
+  protected void waitForAllDocsLoaded(long timeoutMs) {
+    waitForAllDocsLoaded(getTableName(), timeoutMs, 
getCountStarResultWithoutUpsert());
+  }
+
+  private void waitForAllDocsLoaded(String tableName, long timeoutMs, long 
expectedCountStarWithoutUpsertResult) {
     TestUtils.waitForCondition(aVoid -> {
       try {
-        return queryCountStarWithoutUpsert(getTableName()) == 
getCountStarResultWithoutUpsert();
+        return queryCountStarWithoutUpsert(tableName) == 
expectedCountStarWithoutUpsertResult;
+      } catch (Exception e) {
+        return null;
+      }
+    }, 100L, timeoutMs, "Failed to load all documents");
+  }
+
+  private void waitForNumQueriedSegmentsToConverge(String tableName, long 
timeoutMs, long expectedNumSegmentsQueried) {
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        return getNumSegmentsQueried(tableName) == expectedNumSegmentsQueried;
       } catch (Exception e) {
         return null;
       }
     }, 100L, timeoutMs, "Failed to load all documents");
-    assertEquals(getCurrentCountStarResult(), getCountStarResult());
   }
 
   @Test
@@ -192,34 +219,16 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
   protected void testDeleteWithFullUpsert(String kafkaTopicName, String 
tableName, UpsertConfig upsertConfig)
       throws Exception {
     // SETUP
-    // Create table with delete Record column
-    Map<String, String> csvDecoderProperties = 
getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER);
-    Schema upsertSchema = createSchema();
-    upsertSchema.setSchemaName(tableName);
-    addSchema(upsertSchema);
-    TableConfig tableConfig =
-        createCSVUpsertTableConfig(tableName, kafkaTopicName, 
getNumKafkaPartitions(), csvDecoderProperties,
-            upsertConfig, PRIMARY_KEY_COL);
-    addTableConfig(tableConfig);
-
-    // Push initial 10 upsert records - 3 pks 100, 101 and 102
-    List<File> dataFiles = unpackTarData(INPUT_DATA_TAR_FILE, _tempDir);
-    pushCsvIntoKafka(dataFiles.get(0), kafkaTopicName, 0);
+    setupTable(tableName, kafkaTopicName, INPUT_DATA_SMALL_TAR_FILE, 
upsertConfig);
 
     // TEST 1: Delete existing primary key
     // Push 2 records with deleted = true - deletes pks 100 and 102
-    List<String> deleteRecords = 
ImmutableList.of("102,Clifford,counter-strike,102,1681054200000,true",
+    List<String> deleteRecords = 
ImmutableList.of("102,Clifford,counter-strike,102,1681254200000,true",
         "100,Zook,counter-strike,2050,1681377200000,true");
     pushCsvIntoKafka(deleteRecords, kafkaTopicName, 0);
 
     // Wait for all docs (12 with skipUpsert=true) to be loaded
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        return queryCountStarWithoutUpsert(tableName) == 12;
-      } catch (Exception e) {
-        return null;
-      }
-    }, 100L, 600_000L, "Failed to load all upsert records for 
testDeleteWithFullUpsert");
+    waitForAllDocsLoaded(tableName, 600_000L, 12);
 
     // Query for number of records in the table - should only return 1
     ResultSet rs = getPinotConnection().execute("SELECT * FROM " + 
tableName).getResultSet(0);
@@ -252,13 +261,7 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
     List<String> revivedRecord = 
Collections.singletonList("100,Zook-New,,0.0,1684707335000,false");
     pushCsvIntoKafka(revivedRecord, kafkaTopicName, 0);
     // Wait for the new record (13 with skipUpsert=true) to be indexed
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        return queryCountStarWithoutUpsert(tableName) == 13;
-      } catch (Exception e) {
-        return null;
-      }
-    }, 100L, 600_000L, "Failed to load all upsert records for 
testDeleteWithFullUpsert");
+    waitForAllDocsLoaded(tableName, 600_000L, 13);
 
     // Validate: pk is queryable and all columns are overwritten with new value
     rs = getPinotConnection().execute("SELECT playerId, name, game FROM " + 
tableName + " WHERE playerId = 100")
@@ -278,6 +281,27 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
     dropRealtimeTable(tableName);
   }
 
+  private TableConfig setupTable(String tableName, String kafkaTopicName, 
String inputDataFile,
+      UpsertConfig upsertConfig)
+      throws Exception {
+    // SETUP
+    // Create table with delete Record column
+    Map<String, String> csvDecoderProperties = 
getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER);
+    Schema upsertSchema = createSchema();
+    upsertSchema.setSchemaName(tableName);
+    addSchema(upsertSchema);
+    TableConfig tableConfig =
+        createCSVUpsertTableConfig(tableName, kafkaTopicName, 
getNumKafkaPartitions(), csvDecoderProperties,
+            upsertConfig, PRIMARY_KEY_COL);
+    addTableConfig(tableConfig);
+
+    // Push initial 10 upsert records - 3 pks 100, 101 and 102
+    List<File> dataFiles = unpackTarData(inputDataFile, _tempDir);
+    pushCsvIntoKafka(dataFiles.get(0), kafkaTopicName, 0);
+
+    return tableConfig;
+  }
+
   @Test
   public void testDeleteWithPartialUpsert()
       throws Exception {
@@ -319,13 +343,7 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
     pushCsvIntoKafka(deleteRecords, kafkaTopicName, 0);
 
     // Wait for all docs (12 with skipUpsert=true) to be loaded
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        return queryCountStarWithoutUpsert(tableName) == 12;
-      } catch (Exception e) {
-        return null;
-      }
-    }, 100L, 600_000L, "Failed to load all upsert records for 
testDeleteWithFullUpsert");
+    waitForAllDocsLoaded(tableName, 600_000L, 12);
 
     // Query for number of records in the table - should only return 1
     ResultSet rs = getPinotConnection().execute("SELECT * FROM " + 
tableName).getResultSet(0);
@@ -358,13 +376,7 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
     List<String> revivedRecord = 
Collections.singletonList("100,Zook,,0.0,1684707335000,false");
     pushCsvIntoKafka(revivedRecord, kafkaTopicName, 0);
     // Wait for the new record (13 with skipUpsert=true) to be indexed
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        return queryCountStarWithoutUpsert(tableName) == 13;
-      } catch (Exception e) {
-        return null;
-      }
-    }, 100L, 600_000L, "Failed to load all upsert records for 
testDeleteWithFullUpsert");
+    waitForAllDocsLoaded(tableName, 600_000L, 13);
 
     // Validate: pk is queryable and all columns are overwritten with new value
     rs = getPinotConnection().execute("SELECT playerId, name, game FROM " + 
tableName + " WHERE playerId = 100")
@@ -435,6 +447,157 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
       if (serverStarter != null) {
         serverStarter.stop();
       }
+      // Re-initialize the executor for SegmentBuildTimeLeaseExtender to avoid 
the NullPointerException for the other
+      // tests.
+      SegmentBuildTimeLeaseExtender.initExecutor();
     }
   }
+
+  @Test
+  public void testUpsertCompaction()
+      throws Exception {
+    final UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setDeleteRecordColumn(DELETE_COL);
+    String tableName = "gameScoresWithCompaction";
+    TableConfig tableConfig =
+        setupTable(tableName, getKafkaTopic() + "-with-compaction", 
INPUT_DATA_LARGE_TAR_FILE, upsertConfig);
+    tableConfig.setTaskConfig(getCompactionTaskConfig());
+    updateTableConfig(tableConfig);
+    waitForAllDocsLoaded(tableName, 600_000L, 1000);
+    assertEquals(getScore(tableName), 3692);
+    waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 3);
+
+    
assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+        .get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
+    waitForTaskToComplete();
+    waitForAllDocsLoaded(tableName, 600_000L, 3);
+    assertEquals(getScore(tableName), 3692);
+    waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 3);
+
+    // TEARDOWN
+    dropRealtimeTable(tableName);
+  }
+
+  @Test
+  public void testUpsertCompactionDeletesSegments()
+      throws Exception {
+    final UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setDeleteRecordColumn(DELETE_COL);
+    String tableName = "gameScoresWithCompactionDeleteSegments";
+    String kafkaTopicName = getKafkaTopic() + 
"-with-compaction-segment-delete";
+    TableConfig tableConfig = setupTable(tableName, kafkaTopicName, 
INPUT_DATA_LARGE_TAR_FILE, upsertConfig);
+    tableConfig.setTaskConfig(getCompactionTaskConfig());
+    updateTableConfig(tableConfig);
+
+    // Push data one more time
+    List<File> dataFiles = unpackTarData(INPUT_DATA_LARGE_TAR_FILE, _tempDir);
+    pushCsvIntoKafka(dataFiles.get(0), kafkaTopicName, 0);
+    waitForAllDocsLoaded(tableName, 600_000L, 2000);
+    assertEquals(getScore(tableName), 3692);
+    waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 5);
+
+    
assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+        .get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
+    waitForTaskToComplete();
+    waitForAllDocsLoaded(tableName, 600_000L, 3);
+    assertEquals(getScore(tableName), 3692);
+    waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 3);
+    // TEARDOWN
+    dropRealtimeTable(tableName);
+  }
+
+  @Test
+  public void testUpsertCompactionWithSoftDelete()
+      throws Exception {
+    final UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setDeleteRecordColumn(DELETE_COL);
+    String tableName = "gameScoresWithCompactionWithSoftDelete";
+    String kafkaTopicName = getKafkaTopic() + "-with-compaction-delete";
+    TableConfig tableConfig = setupTable(tableName, kafkaTopicName, 
INPUT_DATA_LARGE_TAR_FILE, upsertConfig);
+    tableConfig.setTaskConfig(getCompactionTaskConfig());
+    updateTableConfig(tableConfig);
+
+    // Push data one more time
+    List<File> dataFiles = unpackTarData(INPUT_DATA_LARGE_TAR_FILE, _tempDir);
+    pushCsvIntoKafka(dataFiles.get(0), kafkaTopicName, 0);
+    waitForAllDocsLoaded(tableName, 600_000L, 2000);
+    assertEquals(getScore(tableName), 3692);
+    waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 5);
+    assertEquals(queryCountStar(tableName), 3);
+
+    // Push data to delete 2 rows
+    List<String> deleteRecords = 
ImmutableList.of("102,Clifford,counter-strike,102,1681254200000,true",
+        "100,Zook,counter-strike,2050,1681377200000,true");
+    pushCsvIntoKafka(deleteRecords, kafkaTopicName, 0);
+    waitForAllDocsLoaded(tableName, 600_000L, 2002);
+    assertEquals(queryCountStar(tableName), 1);
+
+    // Run segment compaction. This time, we expect that the deleting rows are 
still there because they are
+    // as part of the consuming segment
+    
assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+        .get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
+    waitForTaskToComplete();
+    waitForAllDocsLoaded(tableName, 600_000L, 3);
+    assertEquals(getScore(tableName), 3692);
+    waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 2);
+    assertEquals(queryCountStar(tableName), 1);
+
+    // Push data again to flush deleting rows
+    pushCsvIntoKafka(dataFiles.get(0), kafkaTopicName, 0);
+    waitForAllDocsLoaded(tableName, 600_000L, 1003);
+    assertEquals(getScore(tableName), 3692);
+    waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 4);
+    assertEquals(queryCountStar(tableName), 1);
+    assertEquals(getNumDeletedRows(tableName), 2);
+
+    // Run segment compaction. This time, we expect that the deleting rows are 
cleaned up
+    
assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+        .get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
+    waitForTaskToComplete();
+    waitForAllDocsLoaded(tableName, 600_000L, 3);
+    assertEquals(getScore(tableName), 3692);
+    waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 2);
+    assertEquals(queryCountStar(tableName), 1);
+    assertEquals(getNumDeletedRows(tableName), 0);
+
+    // TEARDOWN
+    dropRealtimeTable(tableName);
+  }
+
+  private long getScore(String tableName) {
+    return (long) getPinotConnection().execute("SELECT score FROM " + 
tableName + " WHERE playerId = 101")
+        .getResultSet(0).getFloat(0);
+  }
+
+  private long getNumSegmentsQueried(String tableName) {
+    return getPinotConnection().execute("SELECT COUNT(*) FROM " + 
tableName).getExecutionStats()
+        .getNumSegmentsQueried();
+  }
+
+  private long getNumDeletedRows(String tableName) {
+    return getPinotConnection().execute(
+            "SELECT COUNT(*) FROM " + tableName + " WHERE deleted = true 
OPTION(skipUpsert=true)").getResultSet(0)
+        .getLong(0);
+  }
+
+  private void waitForTaskToComplete() {
+    TestUtils.waitForCondition(input -> {
+      // Check task state
+      for (TaskState taskState : 
_helixTaskResourceManager.getTaskStates(MinionConstants.UpsertCompactionTask.TASK_TYPE)
+          .values()) {
+        if (taskState != TaskState.COMPLETED) {
+          return false;
+        }
+      }
+      return true;
+    }, 600_000L, "Failed to complete task");
+  }
+
+  private TableTaskConfig getCompactionTaskConfig() {
+    Map<String, String> tableTaskConfigs = new HashMap<>();
+    
tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY,
 "0d");
+    
tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT,
 "1");
+    return new TableTaskConfig(
+        
Collections.singletonMap(MinionConstants.UpsertCompactionTask.TASK_TYPE, 
tableTaskConfigs));
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/resources/gameScores_large_csv.tar.gz 
b/pinot-integration-tests/src/test/resources/gameScores_large_csv.tar.gz
new file mode 100644
index 0000000000..e9db06acf7
Binary files /dev/null and 
b/pinot-integration-tests/src/test/resources/gameScores_large_csv.tar.gz differ
diff --git 
a/pinot-integration-tests/src/test/resources/upsert_compaction_test.tar.gz 
b/pinot-integration-tests/src/test/resources/upsert_compaction_test.tar.gz
deleted file mode 100644
index 76b0a50126..0000000000
Binary files 
a/pinot-integration-tests/src/test/resources/upsert_compaction_test.tar.gz and 
/dev/null differ
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
index c9654dd2a7..2ad9b415d2 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -105,8 +105,8 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends 
BaseTaskGenerator {
       Map<String, TaskState> incompleteTasks =
           TaskGeneratorUtils.getIncompleteTasks(taskType, realtimeTableName, 
_clusterInfoAccessor);
       if (!incompleteTasks.isEmpty()) {
-        LOGGER.warn("Found incomplete tasks: {} for same table: {}. Skipping 
task generation.",
-            incompleteTasks.keySet(), realtimeTableName);
+        LOGGER.warn("Found incomplete tasks: {} for same table: {} and task 
type: {}. Skipping task generation.",
+            incompleteTasks.keySet(), realtimeTableName, taskType);
         continue;
       }
 
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index e6eaf3679e..210e892434 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -26,11 +26,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.restlet.resources.ValidDocIdMetadataInfo;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
+import 
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
 import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
@@ -97,7 +99,14 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
         continue;
       }
 
-      // TODO: add a check to see if the task is already running for the table
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> incompleteTasks =
+          TaskGeneratorUtils.getIncompleteTasks(taskType, tableNameWithType, 
_clusterInfoAccessor);
+      if (!incompleteTasks.isEmpty()) {
+        LOGGER.warn("Found incomplete tasks: {} for same table: {} and task 
type: {}. Skipping task generation.",
+            incompleteTasks.keySet(), tableNameWithType, taskType);
+        continue;
+      }
 
       // get server to segment mappings
       PinotHelixResourceManager pinotHelixResourceManager = 
_clusterInfoAccessor.getPinotHelixResourceManager();
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 29f90715da..20bba92ffb 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -500,15 +500,22 @@ public class TablesResource {
             String.format("Table %s segment %s is not a immutable segment", 
tableNameWithType, segmentName),
             Response.Status.BAD_REQUEST);
       }
-      MutableRoaringBitmap validDocIds =
-          indexSegment.getValidDocIds() != null ? 
indexSegment.getValidDocIds().getMutableRoaringBitmap() : null;
-      if (validDocIds == null) {
+
+      // Adopt the same logic as the query execution to get the valid doc ids. 
'FilterPlanNode.run()'
+      // If the queryableDocId is available (upsert delete is enabled), we 
read the valid doc ids from it.
+      // Otherwise, we read the valid doc ids.
+      final MutableRoaringBitmap validDocIdSnapshot;
+      if (indexSegment.getQueryableDocIds() != null) {
+        validDocIdSnapshot = 
indexSegment.getQueryableDocIds().getMutableRoaringBitmap();
+      } else if (indexSegment.getValidDocIds() != null) {
+        validDocIdSnapshot = 
indexSegment.getValidDocIds().getMutableRoaringBitmap();
+      } else {
         throw new WebApplicationException(
             String.format("Missing validDocIds for table %s segment %s does 
not exist", tableNameWithType, segmentName),
             Response.Status.NOT_FOUND);
       }
 
-      byte[] validDocIdsBytes = RoaringBitmapUtils.serialize(validDocIds);
+      byte[] validDocIdsBytes = 
RoaringBitmapUtils.serialize(validDocIdSnapshot);
       Response.ResponseBuilder builder = Response.ok(validDocIdsBytes);
       builder.header(HttpHeaders.CONTENT_LENGTH, validDocIdsBytes.length);
       return builder.build();
@@ -580,17 +587,25 @@ public class TablesResource {
           LOGGER.warn(msg);
           continue;
         }
-        MutableRoaringBitmap validDocIds =
-            indexSegment.getValidDocIds() != null ? 
indexSegment.getValidDocIds().getMutableRoaringBitmap() : null;
-        if (validDocIds == null) {
+
+        // Adopt the same logic as the query execution to get the valid doc 
ids. 'FilterPlanNode.run()'
+        // If the queryableDocId is available (upsert delete is enabled), we 
read the valid doc ids from it.
+        // Otherwise, we read the valid doc ids.
+        final MutableRoaringBitmap validDocIdSnapshot;
+        if (indexSegment.getQueryableDocIds() != null) {
+          validDocIdSnapshot = 
indexSegment.getQueryableDocIds().getMutableRoaringBitmap();
+        } else if (indexSegment.getValidDocIds() != null) {
+          validDocIdSnapshot = 
indexSegment.getValidDocIds().getMutableRoaringBitmap();
+        } else {
           String msg = String.format("Missing validDocIds for table %s segment 
%s does not exist", tableNameWithType,
               segmentDataManager.getSegmentName());
           LOGGER.warn(msg);
           throw new WebApplicationException(msg, Response.Status.NOT_FOUND);
         }
+
         Map<String, Object> validDocIdMetadata = new HashMap<>();
         int totalDocs = indexSegment.getSegmentMetadata().getTotalDocs();
-        int totalValidDocs = validDocIds.getCardinality();
+        int totalValidDocs = validDocIdSnapshot.getCardinality();
         int totalInvalidDocs = totalDocs - totalValidDocs;
         validDocIdMetadata.put("segmentName", 
segmentDataManager.getSegmentName());
         validDocIdMetadata.put("totalDocs", totalDocs);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to