This is an automated email from the ASF dual-hosted git repository. georgew5656 pushed a commit to branch addAzureKillTest in repository https://gitbox.apache.org/repos/asf/druid.git
commit a0afd789b3ddfd4ef8ad040f7a5eb5ab462fa8ee Author: George Wu <[email protected]> AuthorDate: Mon Feb 5 15:31:35 2024 -0500 Add kill test --- .../AbstractAzureInputSourceParallelIndexTest.java | 23 ++++++++++++++++++++++ .../AbstractCloudInputSourceParallelIndexTest.java | 4 +++- .../indexer/ITAzureV2ParallelIndexTest.java | 8 ++++++++ .../msq/ITAzureV2SQLBasedIngestionTest.java | 1 + .../apache/druid/testsEx/utils/AzureTestUtil.java | 19 ++++++++++++++++++ 5 files changed, 54 insertions(+), 1 deletion(-) diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractAzureInputSourceParallelIndexTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractAzureInputSourceParallelIndexTest.java index 81852df11c7..d72f6702136 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractAzureInputSourceParallelIndexTest.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractAzureInputSourceParallelIndexTest.java @@ -19,11 +19,16 @@ package org.apache.druid.testsEx.indexer; +import com.google.common.collect.ImmutableList; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.testsEx.utils.AzureTestUtil; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; +import java.net.URI; +import java.util.List; + import static org.junit.Assert.fail; /** @@ -69,4 +74,22 @@ public class AbstractAzureInputSourceParallelIndexTest extends AbstractCloudInpu LOG.warn(e, "Unable to delete container in azure"); } } + + public static void validateAzureSegmentFilesDeleted(String path) + { + List<URI> segmentFiles = ImmutableList.of(); + try { + segmentFiles = azure.listFiles(path); + } + catch (Exception e) { + LOG.warn(e, "Failed to validate that azure segment files were deleted."); + } + finally { + Assert.assertEquals( + "Some segment files were not deleted: " + segmentFiles.toString(), + segmentFiles.size(), + 0 + ); + } + } } diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractCloudInputSourceParallelIndexTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractCloudInputSourceParallelIndexTest.java index a920d928b76..740601a9f9d 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractCloudInputSourceParallelIndexTest.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractCloudInputSourceParallelIndexTest.java @@ -134,8 +134,9 @@ public abstract class AbstractCloudInputSourceParallelIndexTest extends Abstract * @param segmentAvailabilityConfirmationPair set lhs in the pair to true if you want to confirm that the task waited longer than 0ms for the task to complete. * set rhs to true to verify that the segment is actually available. * @param inputSourceType Input source type (eg : s3, gcs, azure) + * @return The datasource used to test. */ - void doTest( + String doTest( Pair<String, List<?>> inputSource, Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair, String inputSourceType @@ -200,6 +201,7 @@ public abstract class AbstractCloudInputSourceParallelIndexTest extends Abstract true, segmentAvailabilityConfirmationPair ); + return indexDatasource; } } diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITAzureV2ParallelIndexTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITAzureV2ParallelIndexTest.java index df539eb5903..af23a623f18 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITAzureV2ParallelIndexTest.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITAzureV2ParallelIndexTest.java @@ -49,4 +49,12 @@ public class ITAzureV2ParallelIndexTest extends AbstractAzureInputSourceParallel { doTest(azureInputSource, new Pair<>(false, false), "azureStorage"); } + + @Test + @Parameters(method = "resources") + public void testAzureIndexData_kill(Pair<String, List<?>> azureInputSource) throws Exception + { + String dataSource = doTest(azureInputSource, new Pair<>(false, false), "azureStorage"); + AbstractAzureInputSourceParallelIndexTest.validateAzureSegmentFilesDeleted("segments" + "/" + dataSource); + } } diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITAzureV2SQLBasedIngestionTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITAzureV2SQLBasedIngestionTest.java index 31486f5f4ed..294f04dec7c 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITAzureV2SQLBasedIngestionTest.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITAzureV2SQLBasedIngestionTest.java @@ -53,5 +53,6 @@ public class ITAzureV2SQLBasedIngestionTest extends AbstractAzureInputSourcePara public void testSQLBasedBatchIngestion(Pair<String, List<?>> azureStorageInputSource) { doMSQTest(azureStorageInputSource, CLOUD_INGEST_SQL, INDEX_QUERIES_FILE, "azureStorage"); + } } diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/AzureTestUtil.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/AzureTestUtil.java index 90be0cbd464..a43fdd48826 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/AzureTestUtil.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/AzureTestUtil.java @@ -29,10 +29,13 @@ import org.apache.druid.testing.utils.ITRetryUtil; import java.io.File; import java.io.IOException; +import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Files; import java.security.InvalidKeyException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; public class AzureTestUtil { @@ -130,4 +133,20 @@ public class AzureTestUtil LOG.info("Uploading file " + DRUID_CLOUD_PATH + '/' + source.getName() + " in azure container " + AZURE_CONTAINER); blob.upload(Files.newInputStream(source.toPath()), source.length()); } + + /** + * Get a list of files under a path to be used for verification of kill tasks. + * + * @param filePath path to look for files under + */ + public List<URI> listFiles(String filePath) throws URISyntaxException, StorageException + { + // Retrieve reference to a previously created container. + CloudBlobContainer container = azureStorageClient.getContainerReference(AZURE_CONTAINER); + List<URI> activeFiles = new ArrayList<>(); + container.listBlobs(DRUID_CLOUD_PATH + '/' + filePath).iterator().forEachRemaining( + blob -> activeFiles.add(blob.getUri()) + ); + return activeFiles; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
