github-advanced-security[bot] commented on code in PR #19620:
URL: https://github.com/apache/druid/pull/19620#discussion_r3462470269
##########
server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerPartialAcquireTest.java:
##########
@@ -405,6 +478,62 @@
);
}
+ @Test
+ void
testPartialAcquireClusteredWithProjectionMountsProjectionBundleWithoutBase()
+ throws ExecutionException, InterruptedException, IOException
+ {
+ final DataSegment clusteredSegment =
+ DataSegment.builder()
Review Comment:
## CodeQL / Deprecated method or constructor invocation
Invoking [DataSegment.builder](1) should be avoided because it has been
deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/11316)
##########
processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryClusteredTest.java:
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.common.asyncresource.AsyncResource;
+import org.apache.druid.common.asyncresource.AsyncResources;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import
org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.filter.EqualityFilter;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.file.CountingRangeReader;
+import org.apache.druid.segment.file.PartialSegmentFileMapperV10;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Partial (on-demand) download coverage for a clustered base-table V10
segment. Cluster groups are resolved from
+ * metadata only ({@code planClusterGroupQuery}); the async cursor factory
downloads one bundle ({@code __base$<ids>})
+ * per surviving group.
+ */
+class PartialQueryableIndexCursorFactoryClusteredTest extends
PartialQueryableIndexCursorFactoryTestBase
+{
+ private static final long T0 = DateTimes.of("2025-01-01").getMillis();
+
+ // tenants sort to dictionary ids acme=0, globex=1 → bundles __base$0 (2
rows) and __base$1 (1 row)
+ private static final String ACME_BUNDLE = "__base$0";
+ private static final String GLOBEX_BUNDLE = "__base$1";
+
+ @TempDir
+ static File sharedTempDir;
+
+ private static File segmentDir;
+
+ @BeforeAll
+ static void buildSegment()
+ {
+ final ClusteredValueGroupsBaseTableProjectionSpec clusterSpec =
+ ClusteredValueGroupsBaseTableProjectionSpec.builder()
+ .columns(
+ new StringDimensionSchema("tenant"),
+ new StringDimensionSchema("region"),
+ new LongDimensionSchema("__time")
+ )
+ .clusteringColumns("tenant")
+ .build();
+ final IncrementalIndexSchema schema =
+ IncrementalIndexSchema.builder()
+ .withMinTimestamp(T0)
+ .withTimestampSpec(new TimestampSpec("ts",
"millis", null))
+ .withQueryGranularity(Granularities.NONE)
+
.withDimensionsSpec(clusterSpec.getDimensionsSpec())
+ .withRollup(false)
+ .withClusterSpec(clusterSpec)
+ .build();
+ final File tmpDir = new File(sharedTempDir, "build_" +
ThreadLocalRandom.current().nextInt());
+ segmentDir = IndexBuilder.create()
+ .useV10()
+ .tmpDir(tmpDir)
+
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(schema)
+
.indexSpec(IndexSpec.builder().withMetadataCompression(CompressionStrategy.NONE).build())
+ // ingest out of clustering order; the writer
sorts groups by clustering value
+ .rows(List.of(
+ row(T0 + 2, "globex", "eu-west-1"),
+ row(T0, "acme", "us-east-1"),
+ row(T0 + 1, "acme", "us-west-2")
+ ))
+ .buildMMappedIndexFile();
+ }
+
+ @Test
+ void testFilterOnClusteringColumnDownloadsOnlySurvivingGroup() throws
IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ try (IndexAndMapper opened = openIndex(rangeReader, "filter_acme")) {
+ final PartialQueryableIndexCursorFactory factory =
factory(opened.index());
+ final CursorBuildSpec spec = CursorBuildSpec.builder()
+ .setFilter(new
EqualityFilter("tenant", ColumnType.STRING, "acme", null))
+ .build();
+
+ try (AsyncCursorHolder asyncHolder = factory.makeCursorHolderAsync(spec);
+ CursorHolder holder = asyncHolder.release()) {
+ // Only the acme group survives the clustering-column filter, so only
its rows are returned.
+ Assertions.assertEquals(
+ List.of(List.of("acme", "us-east-1"), List.of("acme",
"us-west-2")),
+ scanTenantRegion(holder)
+ );
+
+ final Set<String> downloaded = opened.mapper().getDownloadedFiles();
+ // The acme group's columns were materialized.
+ Assertions.assertTrue(downloaded.contains(ACME_BUNDLE + "/region"),
"got: " + downloaded);
+ Assertions.assertTrue(downloaded.contains(ACME_BUNDLE + "/__time"),
"got: " + downloaded);
+ // The globex group bundle was pruned by metadata-only resolution and
never touched.
+ Assertions.assertTrue(
+ downloaded.stream().noneMatch(f -> f.startsWith(GLOBEX_BUNDLE +
"/")),
+ "globex group must not be downloaded; got: " + downloaded
+ );
+ }
+ }
+ }
+
+ @Test
+ void testFilterMatchingNoGroupDownloadsNothing() throws IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ try (IndexAndMapper opened = openIndex(rangeReader, "filter_none")) {
+ final PartialQueryableIndexCursorFactory factory =
factory(opened.index());
+ final CursorBuildSpec spec = CursorBuildSpec.builder()
+ .setFilter(new
EqualityFilter("tenant", ColumnType.STRING, "nobody", null))
+ .build();
+
+ try (AsyncCursorHolder asyncHolder = factory.makeCursorHolderAsync(spec);
+ CursorHolder holder = asyncHolder.release()) {
+ Assertions.assertTrue(scanTenantRegion(holder).isEmpty(), "no group
should survive");
+ Assertions.assertTrue(
+ opened.mapper().getDownloadedFiles().isEmpty(),
+ "no group bundle should be downloaded when nothing survives; got:
" + opened.mapper().getDownloadedFiles()
+ );
+ }
+ }
+ }
+
+ @Test
+ void testFullScanConcatenatesAllGroups() throws IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ try (IndexAndMapper opened = openIndex(rangeReader, "full_scan")) {
+ final PartialQueryableIndexCursorFactory factory =
factory(opened.index());
+
+ try (AsyncCursorHolder asyncHolder =
factory.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+ CursorHolder holder = asyncHolder.release()) {
+ // Groups are concatenated in clustering (dictionary id) order: acme
then globex, clustering constant injected.
+ Assertions.assertEquals(
+ List.of(
+ List.of("acme", "us-east-1"),
+ List.of("acme", "us-west-2"),
+ List.of("globex", "eu-west-1")
+ ),
+ scanTenantRegion(holder)
+ );
+
+ final Set<String> downloaded = opened.mapper().getDownloadedFiles();
+ Assertions.assertTrue(downloaded.contains(ACME_BUNDLE + "/region"),
"got: " + downloaded);
+ Assertions.assertTrue(downloaded.contains(GLOBEX_BUNDLE + "/region"),
"got: " + downloaded);
+ }
+ }
+ }
+
+ @Test
+ void testSyncMakeCursorHolderAfterFullDownload() throws IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ try (IndexAndMapper opened = openIndex(rangeReader, "sync_full")) {
+ final PartialSegmentFileMapperV10 mapper = opened.mapper();
+ final PartialQueryableIndexCursorFactory factory =
factory(opened.index());
+
+ // Sync path refuses until everything is resident.
+ Assertions.assertThrows(
+ DruidException.class,
+ () -> factory.makeCursorHolder(CursorBuildSpec.FULL_SCAN)
+ );
+
+ // Eager-acquire equivalent: materialize every internal file, then the
sync clustered path works via the delegate.
+
mapper.ensureFilesAvailable(mapper.getSegmentFileMetadata().getFiles().keySet());
+ Assertions.assertTrue(mapper.isFullyDownloaded());
+ try (CursorHolder holder =
factory.makeCursorHolder(CursorBuildSpec.FULL_SCAN)) {
+ Assertions.assertEquals(
+ List.of(
+ List.of("acme", "us-east-1"),
+ List.of("acme", "us-west-2"),
+ List.of("globex", "eu-west-1")
+ ),
+ scanTenantRegion(holder)
+ );
+ }
+ }
+ }
+
+ @Test
+ void testAsyncMultiGroupDefersDownloadUntilExecutorRuns() throws Exception
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ final CountDownLatch gate = new CountDownLatch(1);
+ final ExecutorService rawExec =
Execs.singleThreaded("partial-clustered-defer-%d");
+ final ListeningExecutorService gatedExec =
MoreExecutors.listeningDecorator(rawExec);
+ try (IndexAndMapper opened = openIndex(rangeReader, "async_defer")) {
+ final PartialQueryableIndexCursorFactory factory = new
PartialQueryableIndexCursorFactory(
+ opened.index(),
+ QueryableIndexTimeBoundaryInspector.create(opened.index()),
+ noOpAcquirer(gatedExec)
+ );
+ rangeReader.resetCount();
+
+ // Block the download executor so no group download can start yet.
+ @SuppressWarnings("unused")
+ ListenableFuture<?> unused = gatedExec.submit(() -> {
+ try {
+ gate.await();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
Review Comment:
## CodeQL / Unread local variable
Variable 'ListenableFuture<?> unused' is never read.
[Show more
details](https://github.com/apache/druid/security/code-scanning/11317)
##########
processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryClusteredTest.java:
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.common.asyncresource.AsyncResource;
+import org.apache.druid.common.asyncresource.AsyncResources;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import
org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.filter.EqualityFilter;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.file.CountingRangeReader;
+import org.apache.druid.segment.file.PartialSegmentFileMapperV10;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Partial (on-demand) download coverage for a clustered base-table V10
segment. Cluster groups are resolved from
+ * metadata only ({@code planClusterGroupQuery}); the async cursor factory
downloads one bundle ({@code __base$<ids>})
+ * per surviving group.
+ */
+class PartialQueryableIndexCursorFactoryClusteredTest extends
PartialQueryableIndexCursorFactoryTestBase
+{
+ private static final long T0 = DateTimes.of("2025-01-01").getMillis();
+
+ // tenants sort to dictionary ids acme=0, globex=1 → bundles __base$0 (2
rows) and __base$1 (1 row)
+ private static final String ACME_BUNDLE = "__base$0";
+ private static final String GLOBEX_BUNDLE = "__base$1";
+
+ @TempDir
+ static File sharedTempDir;
+
+ private static File segmentDir;
+
+ @BeforeAll
+ static void buildSegment()
+ {
+ final ClusteredValueGroupsBaseTableProjectionSpec clusterSpec =
+ ClusteredValueGroupsBaseTableProjectionSpec.builder()
+ .columns(
+ new StringDimensionSchema("tenant"),
+ new StringDimensionSchema("region"),
+ new LongDimensionSchema("__time")
+ )
+ .clusteringColumns("tenant")
+ .build();
+ final IncrementalIndexSchema schema =
+ IncrementalIndexSchema.builder()
+ .withMinTimestamp(T0)
+ .withTimestampSpec(new TimestampSpec("ts",
"millis", null))
+ .withQueryGranularity(Granularities.NONE)
+
.withDimensionsSpec(clusterSpec.getDimensionsSpec())
+ .withRollup(false)
+ .withClusterSpec(clusterSpec)
+ .build();
+ final File tmpDir = new File(sharedTempDir, "build_" +
ThreadLocalRandom.current().nextInt());
+ segmentDir = IndexBuilder.create()
+ .useV10()
+ .tmpDir(tmpDir)
+
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(schema)
+
.indexSpec(IndexSpec.builder().withMetadataCompression(CompressionStrategy.NONE).build())
+ // ingest out of clustering order; the writer
sorts groups by clustering value
+ .rows(List.of(
+ row(T0 + 2, "globex", "eu-west-1"),
+ row(T0, "acme", "us-east-1"),
+ row(T0 + 1, "acme", "us-west-2")
+ ))
+ .buildMMappedIndexFile();
+ }
+
+ @Test
+ void testFilterOnClusteringColumnDownloadsOnlySurvivingGroup() throws
IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ try (IndexAndMapper opened = openIndex(rangeReader, "filter_acme")) {
+ final PartialQueryableIndexCursorFactory factory =
factory(opened.index());
+ final CursorBuildSpec spec = CursorBuildSpec.builder()
+ .setFilter(new
EqualityFilter("tenant", ColumnType.STRING, "acme", null))
+ .build();
+
+ try (AsyncCursorHolder asyncHolder = factory.makeCursorHolderAsync(spec);
+ CursorHolder holder = asyncHolder.release()) {
+ // Only the acme group survives the clustering-column filter, so only
its rows are returned.
+ Assertions.assertEquals(
+ List.of(List.of("acme", "us-east-1"), List.of("acme",
"us-west-2")),
+ scanTenantRegion(holder)
+ );
+
+ final Set<String> downloaded = opened.mapper().getDownloadedFiles();
+ // The acme group's columns were materialized.
+ Assertions.assertTrue(downloaded.contains(ACME_BUNDLE + "/region"),
"got: " + downloaded);
+ Assertions.assertTrue(downloaded.contains(ACME_BUNDLE + "/__time"),
"got: " + downloaded);
+ // The globex group bundle was pruned by metadata-only resolution and
never touched.
+ Assertions.assertTrue(
+ downloaded.stream().noneMatch(f -> f.startsWith(GLOBEX_BUNDLE +
"/")),
+ "globex group must not be downloaded; got: " + downloaded
+ );
+ }
+ }
+ }
+
+ @Test
+ void testFilterMatchingNoGroupDownloadsNothing() throws IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ try (IndexAndMapper opened = openIndex(rangeReader, "filter_none")) {
+ final PartialQueryableIndexCursorFactory factory =
factory(opened.index());
+ final CursorBuildSpec spec = CursorBuildSpec.builder()
+ .setFilter(new
EqualityFilter("tenant", ColumnType.STRING, "nobody", null))
+ .build();
+
+ try (AsyncCursorHolder asyncHolder = factory.makeCursorHolderAsync(spec);
+ CursorHolder holder = asyncHolder.release()) {
+ Assertions.assertTrue(scanTenantRegion(holder).isEmpty(), "no group
should survive");
+ Assertions.assertTrue(
+ opened.mapper().getDownloadedFiles().isEmpty(),
+ "no group bundle should be downloaded when nothing survives; got:
" + opened.mapper().getDownloadedFiles()
+ );
+ }
+ }
+ }
+
+ @Test
+ void testFullScanConcatenatesAllGroups() throws IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ try (IndexAndMapper opened = openIndex(rangeReader, "full_scan")) {
+ final PartialQueryableIndexCursorFactory factory =
factory(opened.index());
+
+ try (AsyncCursorHolder asyncHolder =
factory.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+ CursorHolder holder = asyncHolder.release()) {
+ // Groups are concatenated in clustering (dictionary id) order: acme
then globex, clustering constant injected.
+ Assertions.assertEquals(
+ List.of(
+ List.of("acme", "us-east-1"),
+ List.of("acme", "us-west-2"),
+ List.of("globex", "eu-west-1")
+ ),
+ scanTenantRegion(holder)
+ );
+
+ final Set<String> downloaded = opened.mapper().getDownloadedFiles();
+ Assertions.assertTrue(downloaded.contains(ACME_BUNDLE + "/region"),
"got: " + downloaded);
+ Assertions.assertTrue(downloaded.contains(GLOBEX_BUNDLE + "/region"),
"got: " + downloaded);
+ }
+ }
+ }
+
+ @Test
+ void testSyncMakeCursorHolderAfterFullDownload() throws IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ try (IndexAndMapper opened = openIndex(rangeReader, "sync_full")) {
+ final PartialSegmentFileMapperV10 mapper = opened.mapper();
+ final PartialQueryableIndexCursorFactory factory =
factory(opened.index());
+
+ // Sync path refuses until everything is resident.
+ Assertions.assertThrows(
+ DruidException.class,
+ () -> factory.makeCursorHolder(CursorBuildSpec.FULL_SCAN)
+ );
+
+ // Eager-acquire equivalent: materialize every internal file, then the
sync clustered path works via the delegate.
+
mapper.ensureFilesAvailable(mapper.getSegmentFileMetadata().getFiles().keySet());
+ Assertions.assertTrue(mapper.isFullyDownloaded());
+ try (CursorHolder holder =
factory.makeCursorHolder(CursorBuildSpec.FULL_SCAN)) {
+ Assertions.assertEquals(
+ List.of(
+ List.of("acme", "us-east-1"),
+ List.of("acme", "us-west-2"),
+ List.of("globex", "eu-west-1")
+ ),
+ scanTenantRegion(holder)
+ );
+ }
+ }
+ }
+
+ @Test
+ void testAsyncMultiGroupDefersDownloadUntilExecutorRuns() throws Exception
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ final CountDownLatch gate = new CountDownLatch(1);
+ final ExecutorService rawExec =
Execs.singleThreaded("partial-clustered-defer-%d");
+ final ListeningExecutorService gatedExec =
MoreExecutors.listeningDecorator(rawExec);
+ try (IndexAndMapper opened = openIndex(rangeReader, "async_defer")) {
+ final PartialQueryableIndexCursorFactory factory = new
PartialQueryableIndexCursorFactory(
+ opened.index(),
+ QueryableIndexTimeBoundaryInspector.create(opened.index()),
+ noOpAcquirer(gatedExec)
+ );
+ rangeReader.resetCount();
+
+ // Block the download executor so no group download can start yet.
+ @SuppressWarnings("unused")
+ ListenableFuture<?> unused = gatedExec.submit(() -> {
+ try {
+ gate.await();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ // FULL_SCAN survives both groups → both bundles must download before
the holder is ready.
+ final AsyncCursorHolder asyncHolder =
factory.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+ Assertions.assertFalse(asyncHolder.isReady(), "must wait while the
download executor is blocked");
+ Assertions.assertEquals(0, rangeReader.getReadCount(), "no group
download should have started yet");
+
+ final CountDownLatch ready = new CountDownLatch(1);
+ asyncHolder.addReadyCallback(ready::countDown);
+ gate.countDown();
+ Assertions.assertTrue(ready.await(15, TimeUnit.SECONDS), "ready callback
must fire once downloads run");
+ Assertions.assertTrue(asyncHolder.isReady());
+
+ try (CursorHolder holder = asyncHolder.release()) {
+ Assertions.assertEquals(
+ List.of(
+ List.of("acme", "us-east-1"),
+ List.of("acme", "us-west-2"),
+ List.of("globex", "eu-west-1")
+ ),
+ scanTenantRegion(holder)
+ );
+ }
+ }
+ finally {
+ gatedExec.shutdownNow();
+ rawExec.shutdownNow();
+ }
+ }
+
+ @Test
+ void testMultiGroupSuccessCloseReleasesEveryBundleHold() throws IOException
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ final CountingBundleAcquirer acquirer = new
CountingBundleAcquirer(directExec());
+ try (IndexAndMapper opened = openIndex(rangeReader, "release_success")) {
+ final PartialQueryableIndexCursorFactory factory = new
PartialQueryableIndexCursorFactory(
+ opened.index(),
+ QueryableIndexTimeBoundaryInspector.create(opened.index()),
+ acquirer
+ );
+
+ // FULL_SCAN survives both groups → both bundles acquired during the
build; with the direct executor the
+ // downloads run inline so the holder is immediately ready.
+ try (AsyncCursorHolder asyncHolder =
factory.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN)) {
+ Assertions.assertTrue(asyncHolder.isReady());
+ Assertions.assertEquals(1, acquirer.acquiredCount(ACME_BUNDLE), "acme
group bundle acquired");
+ Assertions.assertEquals(1, acquirer.acquiredCount(GLOBEX_BUNDLE),
"globex group bundle acquired");
+
+ final CursorHolder holder = asyncHolder.release();
+ // The produced holder owns every group's hold; nothing is released
until it closes.
+ Assertions.assertEquals(0, acquirer.releasedCount(ACME_BUNDLE));
+ Assertions.assertEquals(0, acquirer.releasedCount(GLOBEX_BUNDLE));
+
+ holder.close();
+ // Closing the holder fans out the release across all surviving
groups' bundle holds.
+ Assertions.assertEquals(1, acquirer.releasedCount(ACME_BUNDLE), "acme
hold released on holder close");
+ Assertions.assertEquals(1, acquirer.releasedCount(GLOBEX_BUNDLE),
"globex hold released on holder close");
+ }
+ }
+ }
+
+ @Test
+ void testMultiGroupCancelBeforeReadyReleasesEveryBundleHold() throws
Exception
+ {
+ final CountingRangeReader rangeReader = new
CountingRangeReader(segmentDir);
+ final CountDownLatch gate = new CountDownLatch(1);
+ final ExecutorService rawExec =
Execs.singleThreaded("partial-clustered-cancel-%d");
+ final ListeningExecutorService gatedExec =
MoreExecutors.listeningDecorator(rawExec);
+ final CountingBundleAcquirer acquirer = new
CountingBundleAcquirer(gatedExec);
+ try (IndexAndMapper opened = openIndex(rangeReader, "release_cancel")) {
+ final PartialQueryableIndexCursorFactory factory = new
PartialQueryableIndexCursorFactory(
+ opened.index(),
+ QueryableIndexTimeBoundaryInspector.create(opened.index()),
+ acquirer
+ );
+
+ // Block the download executor so the per-column download bodies stay
queued and the holder never becomes ready.
+ @SuppressWarnings("unused")
+ ListenableFuture<?> unused = gatedExec.submit(() -> {
+ try {
+ gate.await();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
Review Comment:
## CodeQL / Unread local variable
Variable 'ListenableFuture<?> unused' is never read.
[Show more
details](https://github.com/apache/druid/security/code-scanning/11318)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]