github-advanced-security[bot] commented on code in PR #18873:
URL: https://github.com/apache/druid/pull/18873#discussion_r2667311941


##########
multi-stage-query/src/test/java/org/apache/druid/msq/input/RegularLoadableSegmentTest.java:
##########
@@ -0,0 +1,530 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.client.coordinator.NoopCoordinatorClient;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.jackson.SegmentizerModule;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.PhysicalSegmentInspector;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.loading.AcquireSegmentAction;
+import org.apache.druid.segment.loading.AcquireSegmentResult;
+import 
org.apache.druid.segment.loading.LeastBytesUsedStorageLocationSelectorStrategy;
+import org.apache.druid.segment.loading.LoadSpec;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.loading.SegmentLocalCacheManager;
+import org.apache.druid.segment.loading.StorageLocation;
+import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.utils.CompressionUtils;
+import org.joda.time.DateTime;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for {@link RegularLoadableSegment}.
+ */
+class RegularLoadableSegmentTest extends InitializedNullHandlingTest
+{
+  private static final String DATASOURCE = "foo";
+  private static final int NUM_SEGMENTS = 10;
+  private static final int THREADS = 8;
+  private static File SEGMENT_ZIP_FILE;
+
+  @TempDir
+  public Path tempDir;
+
+  private List<DataSegment> segments;
+  private File cacheDir;
+  private File preLoadCacheDir;
+  private SegmentManager segmentManagerDynamic;
+  private SegmentManager segmentManagerPreLoad;
+  private ListeningExecutorService exec;
+
+  @BeforeAll
+  public static void setupStatic(@TempDir Path tempDir) throws IOException
+  {
+    EmittingLogger.registerEmitter(new NoopServiceEmitter());
+    final File segmentFile = TestIndex.persist(
+        TestIndex.getIncrementalTestIndex(),
+        IndexSpec.getDefault(),
+        tempDir.resolve("segment").toFile()
+    );
+    final File zipPath = tempDir.resolve("zip").toFile();
+    FileUtils.mkdirp(zipPath);
+    SEGMENT_ZIP_FILE = new File(zipPath, "index.zip");
+    CompressionUtils.zip(segmentFile, SEGMENT_ZIP_FILE);
+  }
+
+  @BeforeEach
+  public void setUp() throws Exception
+  {
+    final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
+    jsonMapper.registerSubtypes(TestLoadSpec.class);
+    jsonMapper.registerModule(new SegmentizerModule());
+    jsonMapper.setInjectableValues(
+        new InjectableValues.Std()
+            .addValue(ExprMacroTable.class.getName(), 
TestExprMacroTable.INSTANCE)
+            .addValue(ObjectMapper.class.getName(), jsonMapper)
+            .addValue(DataSegment.PruneSpecsHolder.class, 
DataSegment.PruneSpecsHolder.DEFAULT)
+            .addValue(IndexIO.class, TestIndex.INDEX_IO)
+    );
+
+    segments = new ArrayList<>();
+
+    for (int i = 0; i < NUM_SEGMENTS; i++) {
+      // Two segments per interval; helps verify that directory creation + 
deletion does not include races.
+      final DateTime startTime = DateTimes.of("2000").plusDays(i / 2);
+      final int partitionNum = i % 2;
+
+      segments.add(
+          DataSegment.builder()

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [DataSegment.builder](1) should be avoided because it has been 
deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10632)



##########
multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ReadableInputQueue.java:
##########
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.exec.DataServerQueryHandler;
+import org.apache.druid.msq.exec.std.StandardPartitionReader;
+import org.apache.druid.msq.input.LoadableSegment;
+import org.apache.druid.msq.input.PhysicalInputSlice;
+import org.apache.druid.msq.input.stage.ReadablePartition;
+import org.apache.druid.msq.kernel.StageId;
+import org.apache.druid.msq.kernel.StagePartition;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.loading.AcquireSegmentAction;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Queue for returning {@link ReadableInput} from a list of {@link 
PhysicalInputSlice}.
+ *
+ * When closed, this object cancels all pending segment loads and releases all 
segments that have not yet been
+ * acquired by callers through {@link 
SegmentReferenceHolder#getSegmentReferenceOnce()}. Callers that have acquired
+ * segment references are responsible for closing those references, they will 
not be closed by this class.
+ */
+public class ReadableInputQueue implements Closeable
+{
+  private static final Logger log = new Logger(ReadableInputQueue.class);
+
+  /**
+   * Partitions to be read.
+   */
+  @GuardedBy("this")
+  private final Queue<ReadablePartition> readablePartitions = new 
ArrayDeque<>();
+
+  /**
+   * Segments to be loaded.
+   */
+  @GuardedBy("this")
+  private final Queue<LoadableSegment> loadableSegments = new ArrayDeque<>();
+
+  /**
+   * Realtime servers to be queried.
+   */
+  @GuardedBy("this")
+  private final Queue<DataServerQueryHandler> queryableServers = new 
ArrayDeque<>();
+
+  /**
+   * Segments currently being loaded.
+   */
+  @GuardedBy("this")
+  private final Set<AcquireSegmentAction> loadingSegments = new 
LinkedHashSet<>();
+
+  /**
+   * Segments that have been loaded. These are tracked here so we can close 
them if needed.
+   */
+  @GuardedBy("this")
+  private final Set<SegmentReferenceHolder> loadedSegments = new 
LinkedHashSet<>();
+
+  /**
+   * Futures that are sitting ready to be handed out by a call to {@link 
#nextInput()}.
+   */
+  @GuardedBy("this")
+  private final Set<ListenableFuture<ReadableInput>> pendingNextInputs = 
Sets.newIdentityHashSet();
+
+  private final String queryId;
+  private final StandardPartitionReader partitionReader;
+  private final int loadahead;
+  private final AtomicBoolean started = new AtomicBoolean(false);
+
+  public ReadableInputQueue(
+      final String queryId,
+      final StandardPartitionReader partitionReader,
+      final List<PhysicalInputSlice> slices,
+      final int loadahead
+  )
+  {
+    this.queryId = queryId;
+    this.partitionReader = partitionReader;
+    this.loadahead = loadahead;
+
+    for (final PhysicalInputSlice slice : slices) {
+      loadableSegments.addAll(slice.getLoadableSegments());
+      queryableServers.addAll(slice.getQueryableServers());
+      slice.getReadablePartitions().forEach(readablePartitions::add);
+    }
+  }
+
+  /**
+   * If this method has not yet been called, then:
+   * (1) transition all locally-cached segments out of {@link 
#loadableSegments}
+   * (2) start loading up to {@link #loadahead} additional segments for future 
calls to {@link #nextInput()}
+   * If this method has previously been called, subsequent calls do nothing.
+   * This is separated from the constructor because we don't want to acquire 
resources immediately on construction.
+   */
+  public void start()
+  {
+    if (started.compareAndSet(false, true)) {
+      // (1) acquire all locally-cached segments
+      synchronized (this) {
+        final List<LoadableSegment> toLoad = new ArrayList<>(); // Temporarily 
store all non-cached segments
+        LoadableSegment loadableSegment;
+        while ((loadableSegment = loadableSegments.poll()) != null) {
+          final Optional<Segment> cachedSegment = 
loadableSegment.acquireIfCached();
+          if (cachedSegment.isPresent()) {
+            final SegmentReferenceHolder holder = new SegmentReferenceHolder(
+                new SegmentReference(loadableSegment.descriptor(), 
cachedSegment, null),
+                loadableSegment.inputCounters(),
+                loadableSegment.description()
+            );
+            loadedSegments.add(holder);
+            
pendingNextInputs.add(Futures.immediateFuture(ReadableInput.segment(holder)));
+          } else {
+            toLoad.add(loadableSegment);
+          }
+        }
+        loadableSegments.addAll(toLoad); // Put non-cached segments back into 
loadableSegments
+      }
+
+      // (2) start loading up to "loadahead" additional segments
+      for (int i = 0; i < loadahead; i++) {
+        if (!addLoadaheadFuture()) {
+          break;
+        }
+      }
+    }
+  }
+
+  /**
+   * Returns the number of remaining inputs that can be returned by calls to 
{@link #nextInput()}.
+   */
+  public int remaining()
+  {
+    synchronized (this) {
+      return readablePartitions.size() + loadableSegments.size() + 
queryableServers.size() + pendingNextInputs.size();
+    }
+  }
+
+  /**
+   * Returns the next {@link ReadableInput}. The future resolves when the 
input is ready to read.
+   */
+  @Nullable
+  public ListenableFuture<ReadableInput> nextInput()
+  {
+    if (!started.get()) {
+      throw DruidException.defensive("Not started, must call start() first");
+    }
+
+    ListenableFuture<ReadableInput> future;
+
+    future = nextServerInput();
+    if (future != null) {
+      return future;
+    }
+
+    future = nextChannelInput();
+    if (future != null) {
+      return future;
+    }
+
+    future = nextSegmentInput();
+    if (future != null) {
+      return future;
+    }
+
+    return null;
+  }
+
+  /**
+   * Returns the next input from {@link #queryableServers}, if any. Returns 
null if none remain.
+   */
+  @Nullable
+  private ListenableFuture<ReadableInput> nextServerInput()
+  {
+    final DataServerQueryHandler handler;
+    synchronized (this) {
+      handler = queryableServers.poll();
+    }
+
+    if (handler == null) {
+      return null;
+    }
+
+    return Futures.immediateFuture(ReadableInput.dataServerQuery(handler));
+  }
+
+  /**
+   * Returns the next input from {@link #readablePartitions}, if any. Returns 
null if none remain.
+   */
+  @Nullable
+  private ListenableFuture<ReadableInput> nextChannelInput()
+  {
+    final ReadablePartition readablePartition;
+    synchronized (this) {
+      readablePartition = readablePartitions.poll();
+    }
+
+    if (readablePartition == null) {
+      return null;
+    }
+
+    ReadableFrameChannel channel = null;
+    try {
+      channel = partitionReader.openChannel(readablePartition);
+      return Futures.immediateFuture(
+          ReadableInput.channel(
+              channel,
+              partitionReader.frameReader(readablePartition.getStageNumber()),
+              new StagePartition(
+                  new StageId(queryId, readablePartition.getStageNumber()),
+                  readablePartition.getPartitionNumber()
+              )
+          )
+      );
+    }
+    catch (IOException e) {
+      throw CloseableUtils.closeAndWrapInCatch(e, channel);
+    }
+  }
+
+  /**
+   * Returns the next input from {@link #loadableSegments}, if any. Returns 
null if none remain.
+   */
+  @Nullable
+  private ListenableFuture<ReadableInput> nextSegmentInput()
+  {
+    // Pick a loadahead future, preferring ones that are already loaded.
+    ListenableFuture<ReadableInput> selectedLoadaheadFuture = null;
+    synchronized (this) {
+      for (ListenableFuture<ReadableInput> f : pendingNextInputs) {
+        if (selectedLoadaheadFuture == null || f.isDone()) {
+          selectedLoadaheadFuture = f;
+          if (f.isDone()) {
+            break;
+          }
+        }
+      }
+
+      if (selectedLoadaheadFuture != null) {
+        pendingNextInputs.remove(selectedLoadaheadFuture);
+        if (pendingNextInputs.size() < loadahead) {
+          addLoadaheadFuture(); // Replace the one we just took out.
+        }
+        return selectedLoadaheadFuture;
+      }
+    }
+
+    return loadNextSegment();
+  }
+
+  /**
+   * Load the next segment from {@link #loadableSegments} and return a future 
to its reference. Returns null
+   * if {@link #loadableSegments} is empty.
+   */
+  @Nullable
+  private ListenableFuture<ReadableInput> loadNextSegment()
+  {
+    synchronized (this) {
+      final LoadableSegment nextLoadableSegment = loadableSegments.poll();
+      if (nextLoadableSegment == null) {
+        return null;
+      }
+
+      final AcquireSegmentAction acquireSegmentAction = 
nextLoadableSegment.acquire();
+      loadingSegments.add(acquireSegmentAction);
+      return FutureUtils.transform(
+          acquireSegmentAction.getSegmentFuture(),
+          segment -> {
+            synchronized (ReadableInputQueue.this) {
+              // Transfer segment from "loadingSegments" to "loadedSegments" 
and return a reference to it.
+              if (loadingSegments.remove(acquireSegmentAction)) {
+                try {
+                  final SegmentReferenceHolder referenceHolder = new 
SegmentReferenceHolder(
+                      new SegmentReference(
+                          nextLoadableSegment.descriptor(),
+                          segment.getReferenceProvider().acquireReference(),
+                          acquireSegmentAction // Release the hold when the 
SegmentReference is closed.
+                      ),
+                      nextLoadableSegment.inputCounters(),
+                      nextLoadableSegment.description()
+                  );
+                  loadedSegments.add(referenceHolder);
+                  return ReadableInput.segment(referenceHolder);
+                }
+                catch (Throwable e) {
+                  // Javadoc for segment.acquireReference() suggests it can 
throw exceptions; handle that here
+                  // by closing the original AcquireSegmentAction.
+                  throw CloseableUtils.closeAndWrapInCatch(e, 
acquireSegmentAction);
+                }
+              } else {
+                throw DruidException.defensive(
+                    "Segment[%s] removed from loadingSegments before loading 
complete. It is possible that close() "
+                    + "was called with futures in flight.",
+                    nextLoadableSegment.description()
+                );
+              }
+            }
+          }
+      );
+    }
+  }
+
+  /**
+   * Calls {@link #nextSegmentInput()} and adds the future to {@link 
#pendingNextInputs}. Returns whether a future
+   * was added.
+   */
+  private boolean addLoadaheadFuture()
+  {
+    final ListenableFuture<ReadableInput> nextFuture = loadNextSegment();
+    if (nextFuture != null) {
+      synchronized (this) {
+        pendingNextInputs.add(nextFuture);
+      }
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public void close()
+  {
+    synchronized (this) {
+      readablePartitions.clear();
+      queryableServers.clear();
+      loadableSegments.clear();
+
+      // Cancel all pending segment loads.
+      for (AcquireSegmentAction acquireSegmentAction : loadingSegments) {
+        CloseableUtils.closeAndSuppressExceptions(
+            acquireSegmentAction,
+            e -> log.warn(e, "Failed to close loadingSegment[%s]", 
acquireSegmentAction)

Review Comment:
   ## Use of default toString()
   
   Default toString(): AcquireSegmentAction inherits toString() from Object, 
and so is not suitable for printing.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10631)



##########
server/src/test/java/org/apache/druid/test/utils/TestSegmentManager.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.test.utils;
+
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Wrapper around {@link SegmentManager} using {@link TestSegmentCacheManager} 
that helps with adding in-memory
+ * test segments.
+ */
+public class TestSegmentManager
+{
+  private final SegmentManager segmentManager;
+  private final TestSegmentCacheManager cacheManager;
+  private final ConcurrentMap<SegmentId, DataSegment> 
testGeneratedDataSegments = new ConcurrentHashMap<>();
+  private final ConcurrentMap<SegmentId, DataSegment> addedSegments = new 
ConcurrentHashMap<>();
+
+  public TestSegmentManager()
+  {
+    this.cacheManager = new TestSegmentCacheManager();
+    this.segmentManager = new SegmentManager(cacheManager);
+  }
+
+  /**
+   * Creates a minimal DataSegment for a test Segment.
+   */
+  public static DataSegment createDataSegmentForTest(final SegmentId segmentId)
+  {
+    return DataSegment.builder()

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [DataSegment.builder](1) should be avoided because it has been 
deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10633)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to