This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f4c85e9596 Fix the flaky UpsertTableSegmentUploadIntegrationTest
(#8675)
f4c85e9596 is described below
commit f4c85e95969b0bc7276ec18ae118051af93e9f2c
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue May 10 14:25:13 2022 -0700
Fix the flaky UpsertTableSegmentUploadIntegrationTest (#8675)
---
.../pinot/integration/tests/ClusterTest.java | 95 ++++-----
.../tests/OfflineClusterIntegrationTest.java | 10 +-
.../UpsertTableSegmentUploadIntegrationTest.java | 218 +++++++++------------
3 files changed, 134 insertions(+), 189 deletions(-)
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 35132c3bb8..e940fd0f06 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -67,6 +67,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordExtractor;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.apache.pinot.spi.utils.CommonConstants.Minion;
@@ -299,28 +300,47 @@ public abstract class ClusterTest extends ControllerTest {
/**
* Upload all segments inside the given directory to the cluster.
- *
- * @param tarDir Segment directory
*/
protected void uploadSegments(String tableName, File tarDir)
throws Exception {
- File[] segmentTarFiles = tarDir.listFiles();
- assertNotNull(segmentTarFiles);
- int numSegments = segmentTarFiles.length;
+ uploadSegments(tableName, TableType.OFFLINE, tarDir);
+ }
+
+ /**
+ * Upload all segments inside the given directory to the cluster.
+ */
+ protected void uploadSegments(String tableName, TableType tableType, File
tarDir)
+ throws Exception {
+ uploadSegments(tableName, tableType, Collections.singletonList(tarDir));
+ }
+
+ /**
+ * Upload all segments inside the given directories to the cluster.
+ */
+ protected void uploadSegments(String tableName, TableType tableType,
List<File> tarDirs)
+ throws Exception {
+ List<File> segmentTarFiles = new ArrayList<>();
+ for (File tarDir : tarDirs) {
+ File[] tarFiles = tarDir.listFiles();
+ assertNotNull(tarFiles);
+ Collections.addAll(segmentTarFiles, tarFiles);
+ }
+ int numSegments = segmentTarFiles.size();
assertTrue(numSegments > 0);
- URI uploadSegmentHttpURI =
FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
+ URI uploadSegmentHttpURI =
+
FileUploadDownloadClient.getUploadSegmentURI(CommonConstants.HTTP_PROTOCOL,
LOCAL_HOST, _controllerPort);
try (FileUploadDownloadClient fileUploadDownloadClient = new
FileUploadDownloadClient()) {
if (numSegments == 1) {
- File segmentTarFile = segmentTarFiles[0];
+ File segmentTarFile = segmentTarFiles.get(0);
if (System.currentTimeMillis() % 2 == 0) {
assertEquals(
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI,
segmentTarFile.getName(), segmentTarFile,
- tableName).getStatusCode(), HttpStatus.SC_OK);
+ tableName, tableType).getStatusCode(), HttpStatus.SC_OK);
} else {
assertEquals(
- uploadSegmentWithOnlyMetadata(tableName, uploadSegmentHttpURI,
fileUploadDownloadClient, segmentTarFile),
- HttpStatus.SC_OK);
+ uploadSegmentWithOnlyMetadata(tableName, tableType,
uploadSegmentHttpURI, fileUploadDownloadClient,
+ segmentTarFile), HttpStatus.SC_OK);
}
} else {
// Upload all segments in parallel
@@ -330,9 +350,9 @@ public abstract class ClusterTest extends ControllerTest {
futures.add(executorService.submit(() -> {
if (System.currentTimeMillis() % 2 == 0) {
return
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI,
segmentTarFile.getName(),
- segmentTarFile, tableName).getStatusCode();
+ segmentTarFile, tableName, tableType).getStatusCode();
} else {
- return uploadSegmentWithOnlyMetadata(tableName,
uploadSegmentHttpURI, fileUploadDownloadClient,
+ return uploadSegmentWithOnlyMetadata(tableName, tableType,
uploadSegmentHttpURI, fileUploadDownloadClient,
segmentTarFile);
}
}));
@@ -345,60 +365,19 @@ public abstract class ClusterTest extends ControllerTest {
}
}
- /**
- * tarDirPaths contains a list of directories that contain segment files.
API uploads all segments inside the given
- * list of directories to the cluster.
- *
- * @param tarDirPaths List of directories containing segments
- */
- protected void uploadSegments(String tableName, List<File> tarDirPaths,
TableType tableType,
- boolean enableParallelPushProtection)
- throws Exception {
- List<File> segmentTarFiles = new ArrayList<>();
-
- for (File tarDir : tarDirPaths) {
- Collections.addAll(segmentTarFiles, tarDir.listFiles());
- }
- assertNotNull(segmentTarFiles);
- int numSegments = segmentTarFiles.size();
- assertTrue(numSegments > 0);
-
- URI uploadSegmentHttpURI =
FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
- try (FileUploadDownloadClient fileUploadDownloadClient = new
FileUploadDownloadClient()) {
- if (numSegments == 1) {
- File segmentTarFile = segmentTarFiles.get(0);
- assertEquals(
- fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI,
segmentTarFile.getName(), segmentTarFile,
- tableName, tableType.OFFLINE, enableParallelPushProtection,
true).getStatusCode(), HttpStatus.SC_OK);
- } else {
- // Upload all segments in parallel
- ExecutorService executorService =
Executors.newFixedThreadPool(numSegments);
- List<Future<Integer>> futures = new ArrayList<>(numSegments);
- for (File segmentTarFile : segmentTarFiles) {
- futures.add(executorService.submit(() -> {
- return
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI,
segmentTarFile.getName(),
- segmentTarFile, tableName, tableType.OFFLINE,
enableParallelPushProtection, true).getStatusCode();
- }));
- }
- executorService.shutdown();
- for (Future<Integer> future : futures) {
- assertEquals((int) future.get(), HttpStatus.SC_OK);
- }
- }
- }
- }
-
- private int uploadSegmentWithOnlyMetadata(String tableName, URI
uploadSegmentHttpURI,
+ private int uploadSegmentWithOnlyMetadata(String tableName, TableType
tableType, URI uploadSegmentHttpURI,
FileUploadDownloadClient fileUploadDownloadClient, File segmentTarFile)
throws IOException, HttpErrorStatusException {
List<Header> headers = ImmutableList.of(new
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI,
"file://" + segmentTarFile.getParentFile().getAbsolutePath() + "/" +
URLEncoder.encode(segmentTarFile.getName(),
StandardCharsets.UTF_8.toString())), new
BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
FileUploadDownloadClient.FileUploadType.METADATA.toString()));
- // Add table name as a request parameter
+ // Add table name and table type as request parameters
NameValuePair tableNameValuePair =
new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
tableName);
- List<NameValuePair> parameters = Arrays.asList(tableNameValuePair);
+ NameValuePair tableTypeValuePair =
+ new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
tableType.name());
+ List<NameValuePair> parameters = Arrays.asList(tableNameValuePair,
tableTypeValuePair);
return
fileUploadDownloadClient.uploadSegmentMetadata(uploadSegmentHttpURI,
segmentTarFile.getName(),
segmentTarFile, headers, parameters,
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode();
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 9bb4853283..0669847093 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -184,13 +184,11 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
File tarDir2 = new File(_tempDir, "tarDir2");
FileUtils.copyDirectory(_tarDir, tarDir2);
- List<File> tarDirPaths = new ArrayList<>();
- tarDirPaths.add(_tarDir);
- tarDirPaths.add(tarDir2);
-
- // TODO: Move this block to a separate method.
+ List<File> tarDirs = new ArrayList<>();
+ tarDirs.add(_tarDir);
+ tarDirs.add(tarDir2);
try {
- uploadSegments(getTableName(), tarDirPaths, TableType.OFFLINE, true);
+ uploadSegments(getTableName(), TableType.OFFLINE, tarDirs);
} catch (Exception e) {
// If enableParallelPushProtection is enabled and the same segment is
uploaded concurrently, we could get one
// of the two exception - 409 conflict of the second call enters
ProcessExistingSegment ; segmentZkMetadata
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
index 616f150d52..e6f5ff248f 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
@@ -20,48 +20,36 @@ package org.apache.pinot.integration.tests;
import java.io.File;
import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import org.apache.commons.io.FileUtils;
-import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
-import org.apache.http.HttpStatus;
-import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.helix.HelixHelper;
-import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
+import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
public class UpsertTableSegmentUploadIntegrationTest extends
BaseClusterIntegrationTestSet {
- private static final int NUM_BROKERS = 1;
private static final int NUM_SERVERS = 2;
- // Segment 1 contains records of pk value 100000
+ private static final String PRIMARY_KEY_COL = "clientId";
+ private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME);
+
+ // Segment 1 contains records of pk value 100000 (partition 0)
private static final String UPLOADED_SEGMENT_1 = "mytable_10027_19736_0 %";
- // Segment 2 contains records of pk value 100001
+ // Segment 2 contains records of pk value 100001 (partition 1)
private static final String UPLOADED_SEGMENT_2 = "mytable_10072_19919_1 %";
- // Segment 3 contains records of pk value 100000
+ // Segment 3 contains records of pk value 100002 (partition 1)
private static final String UPLOADED_SEGMENT_3 = "mytable_10158_19938_2 %";
- private static final String PRIMARY_KEY_COL = "clientId";
- private static final String TABLE_NAME_WITH_TYPE = "mytable_REALTIME";
@BeforeClass
public void setUp()
@@ -72,27 +60,24 @@ public class UpsertTableSegmentUploadIntegrationTest
extends BaseClusterIntegrat
startZk();
// Start a customized controller with more frequent realtime segment
validation
startController();
- startBrokers(getNumBrokers());
+ startBroker();
startServers(NUM_SERVERS);
- // Start Kafka
- startKafka();
-
- // Create and upload the schema.
- Schema schema = createSchema();
- addSchema(schema);
-
// Unpack the Avro files
List<File> avroFiles = unpackAvroData(_tempDir);
- // Push data to Kafka
+ // Start Kafka and push data into Kafka
+ startKafka();
pushAvroIntoKafka(avroFiles);
- // Create and upload the table config
- TableConfig upsertTableConfig = createUpsertTableConfig(avroFiles.get(0),
PRIMARY_KEY_COL, getNumKafkaPartitions());
- addTableConfig(upsertTableConfig);
+
+ // Create and upload schema and table config
+ Schema schema = createSchema();
+ addSchema(schema);
+ TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0),
PRIMARY_KEY_COL, getNumKafkaPartitions());
+ addTableConfig(tableConfig);
// Create and upload segments
- ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles,
upsertTableConfig, schema, 0, _segmentDir, _tarDir);
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig,
schema, 0, _segmentDir, _tarDir);
uploadSegments(getTableName(), TableType.REALTIME, _tarDir);
// Wait for all documents loaded
@@ -131,8 +116,9 @@ public class UpsertTableSegmentUploadIntegrationTest
extends BaseClusterIntegrat
return true;
}
- protected int getNumBrokers() {
- return NUM_BROKERS;
+ @Override
+ protected String getPartitionColumn() {
+ return PRIMARY_KEY_COL;
}
@Override
@@ -142,113 +128,95 @@ public class UpsertTableSegmentUploadIntegrationTest
extends BaseClusterIntegrat
}
@Override
- protected String getPartitionColumn() {
- return PRIMARY_KEY_COL;
+ protected void waitForAllDocsLoaded(long timeoutMs)
+ throws Exception {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ return getCurrentCountStarResultWithoutUpsert() ==
getCountStarResultWithoutUpsert();
+ } catch (Exception e) {
+ return null;
+ }
+ }, 100L, timeoutMs, "Failed to load all documents");
+ assertEquals(getCurrentCountStarResult(), getCountStarResult());
}
- @Override
- protected void startController()
- throws Exception {
- Map<String, Object> controllerConfig = getDefaultControllerConfiguration();
- // Perform realtime segment validation every second with 1 second initial
delay.
- controllerConfig
-
.put(ControllerConf.ControllerPeriodicTasksConf.DEPRECATED_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS,
1);
- controllerConfig
-
.put(ControllerConf.ControllerPeriodicTasksConf.DEPRECATED_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS,
1);
- controllerConfig
-
.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS,
1);
- startController(controllerConfig);
+ private long getCurrentCountStarResultWithoutUpsert() {
+ return getPinotConnection().execute("SELECT COUNT(*) FROM " +
getTableName() + " OPTION(skipUpsert=true)")
+ .getResultSet(0).getLong(0);
+ }
+
+ private long getCountStarResultWithoutUpsert() {
+ // 3 Avro files, each with 100 documents, one copy from streaming source,
one copy from batch source
+ return 600;
}
@Test
public void testSegmentAssignment()
throws Exception {
- IdealState idealState = HelixHelper.getTableIdealState(_helixManager,
TABLE_NAME_WITH_TYPE);
- Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult());
- verifyTableIdealStates(idealState);
- // Wait 3 seconds to let the realtime validation thread to run.
- Thread.sleep(3000);
- // Verify the result again.
- Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult());
- verifyTableIdealStates(idealState);
-
- // Restart the servers and check every segment is not in ERROR state.
+ verifyIdealState();
+
+ // Run the real-time segment validation and check again
+ _controllerStarter.getRealtimeSegmentValidationManager().run();
+ verifyIdealState();
+ assertEquals(getCurrentCountStarResult(), getCountStarResult());
+ assertEquals(getCurrentCountStarResultWithoutUpsert(),
getCountStarResultWithoutUpsert());
+
+ // Restart the servers and check again
restartServers();
- verifyTableIdealStates(idealState);
- ExternalView ev =
- HelixHelper.getExternalViewForResource(_helixAdmin,
this.getHelixClusterName(), TABLE_NAME_WITH_TYPE);
- Set<String> segments = ev.getPartitionSet();
- Assert.assertEquals(segments.size(), 5);
- for (String segment : segments) {
- Map<String, String> stateMap = ev.getStateMap(segment);
- Assert.assertTrue(stateMap.size() > 0);
- for (Map.Entry<String, String> server2state: stateMap.entrySet()) {
- Assert.assertFalse("ERROR".equals(server2state.getValue()));
- }
- }
- // Verify the result again.
- Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult());
+ verifyIdealState();
+ waitForAllDocsLoaded(600_000L);
}
- private void verifyTableIdealStates(IdealState idealState) {
- // Verify various ideal state properties
- Set<String> segments = idealState.getPartitionSet();
- Assert.assertEquals(segments.size(), 5);
- Map<String, Integer> segment2PartitionId = new HashMap<>();
- segment2PartitionId.put(UPLOADED_SEGMENT_1, 0);
- segment2PartitionId.put(UPLOADED_SEGMENT_2, 1);
- segment2PartitionId.put(UPLOADED_SEGMENT_3, 1);
-
- // Verify that all segments of the same partition are mapped to the same
single server.
- Map<Integer, Set<String>> segmentAssignment = new HashMap<>();
- for (String segment : segments) {
- Integer partitionId;
- if (LLCSegmentName.isLowLevelConsumerSegmentName(segment)) {
- partitionId = new LLCSegmentName(segment).getPartitionGroupId();
+ private void verifyIdealState() {
+ IdealState idealState = HelixHelper.getTableIdealState(_helixManager,
REALTIME_TABLE_NAME);
+ Map<String, Map<String, String>> segmentAssignment =
idealState.getRecord().getMapFields();
+ assertEquals(segmentAssignment.size(), 5);
+
+ String serverForPartition0 = null;
+ String serverForPartition1 = null;
+ for (Map.Entry<String, Map<String, String>> entry :
segmentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> instanceStateMap = entry.getValue();
+
+ // Verify that all segments have the correct state
+ assertEquals(instanceStateMap.size(), 1);
+ Map.Entry<String, String> instanceIdAndState =
instanceStateMap.entrySet().iterator().next();
+ String state = instanceIdAndState.getValue();
+ if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
+ assertEquals(state, SegmentStateModel.CONSUMING);
} else {
- partitionId = segment2PartitionId.get(segment);
+ assertEquals(state, SegmentStateModel.ONLINE);
}
- Assert.assertNotNull(partitionId);
- Set<String> instances = idealState.getInstanceSet(segment);
- Assert.assertEquals(1, instances.size());
- if (segmentAssignment.containsKey(partitionId)) {
- Assert.assertEquals(instances, segmentAssignment.get(partitionId));
+
+ // Verify that all segments of the same partition are mapped to the same
server
+ String instanceId = instanceIdAndState.getKey();
+ int partitionId = getSegmentPartitionId(segmentName);
+ if (partitionId == 0) {
+ if (serverForPartition0 == null) {
+ serverForPartition0 = instanceId;
+ } else {
+ assertEquals(instanceId, serverForPartition0);
+ }
} else {
- segmentAssignment.put(partitionId, instances);
+ assertEquals(partitionId, 1);
+ if (serverForPartition1 == null) {
+ serverForPartition1 = instanceId;
+ } else {
+ assertEquals(instanceId, serverForPartition1);
+ }
}
}
}
- private void uploadSegments(String tableName, TableType tableType, File
tarDir)
- throws Exception {
- File[] segmentTarFiles = tarDir.listFiles();
- assertNotNull(segmentTarFiles);
- int numSegments = segmentTarFiles.length;
- assertTrue(numSegments > 0);
-
- URI uploadSegmentHttpURI =
FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
- try (FileUploadDownloadClient fileUploadDownloadClient = new
FileUploadDownloadClient()) {
- if (numSegments == 1) {
- File segmentTarFile = segmentTarFiles[0];
- assertEquals(fileUploadDownloadClient
- .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(),
segmentTarFile, tableName, tableType)
- .getStatusCode(), HttpStatus.SC_OK);
- } else {
- // Upload all segments in parallel
- ExecutorService executorService =
Executors.newFixedThreadPool(numSegments);
- List<Future<Integer>> futures = new ArrayList<>(numSegments);
- for (File segmentTarFile : segmentTarFiles) {
- futures.add(executorService.submit(() -> {
- return fileUploadDownloadClient
- .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(),
segmentTarFile, tableName, tableType)
- .getStatusCode();
- }));
- }
- executorService.shutdown();
- for (Future<Integer> future : futures) {
- assertEquals((int) future.get(), HttpStatus.SC_OK);
- }
- }
+ private static int getSegmentPartitionId(String segmentName) {
+ switch (segmentName) {
+ case UPLOADED_SEGMENT_1:
+ return 0;
+ case UPLOADED_SEGMENT_2:
+ case UPLOADED_SEGMENT_3:
+ return 1;
+ default:
+ return new LLCSegmentName(segmentName).getPartitionGroupId();
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]