github-advanced-security[bot] commented on code in PR #18880:
URL: https://github.com/apache/druid/pull/18880#discussion_r2655981947


##########
processing/src/main/java/org/apache/druid/segment/file/SegmentFileMapperV10.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * {@link SegmentFileMapper} implementation for V10 segment files.
+ * <p>
+ * V10 file format:
+ * | version (byte) | meta compression (byte) | meta length (int) | meta json 
| chunk 0 | chunk 1 | ... | chunk n |
+ */
+public class SegmentFileMapperV10 implements SegmentFileMapper
+{
+  /**
+   * Create a v10 {@link SegmentFileMapper} with 'external' attached v10 
segment files
+   *
+   * @param segmentFile v10 segment file with name {@link 
IndexIO#V10_FILE_NAME}
+   * @param mapper      json mapper to deserialize metadata
+   * @param externals   list of 'external' v10 segment files to attach to this 
mapper and files that can be referenced
+   *                    using {@link #mapExternalFile(String, String)}
+   * @return v10 {@link SegmentFileMapper} using memory mapped {@link 
ByteBuffer}
+   * @throws IOException
+   */
+  public static SegmentFileMapperV10 create(
+      File segmentFile,
+      ObjectMapper mapper,
+      List<String> externals
+  ) throws IOException
+  {
+    final SegmentFileMapperV10 entryPoint = create(segmentFile, mapper);
+
+    final Map<String, SegmentFileMapperV10> externalMappers = new HashMap<>();
+    try {
+      for (String filename : externals) {
+        final File externalFile = new File(segmentFile.getParentFile(), 
filename);
+        if (externalFile.exists()) {
+          externalMappers.put(filename, create(externalFile, mapper));
+        }
+      }
+    }
+    catch (Throwable t) {
+      Closer closer = Closer.create();
+      closer.registerAll(externalMappers.values());
+      throw CloseableUtils.closeAndWrapInCatch(t, closer);
+    }
+
+    return new SegmentFileMapperV10(
+        entryPoint.segmentFile,
+        entryPoint.segmentFileMetadata,
+        entryPoint.containers,
+        externalMappers
+    );
+  }
+
+  /**
+   * Create a v10 {@link SegmentFileMapper}
+   */
+  public static SegmentFileMapperV10 create(
+      File segmentFile,
+      ObjectMapper mapper
+  ) throws IOException
+  {
+    try (FileInputStream fis = new FileInputStream(segmentFile)) {
+      // version (byte) | metadata compression (byte) | metadata length (int)
+      byte[] header = new byte[1 + 1 + Integer.BYTES];
+      int read = fis.read(header);
+      if (read < header.length) {
+        throw DruidException.defensive("expected at least [%s] bytes, but only 
read [%s]", header.length, read);
+      }
+      ByteBuffer headerBuffer = ByteBuffer.wrap(header);
+      headerBuffer.order(ByteOrder.LITTLE_ENDIAN);
+
+      if (headerBuffer.get(0) != IndexIO.V10_VERSION) {
+        throw DruidException.defensive("not v10, got[%s] instead", 
headerBuffer.get(0));
+      }
+
+      // ideally we should make compression work, right now only uncompressed 
is supported (we probably need to add
+      // another int for compressed length if strategy is to be compressed)
+      byte compression = headerBuffer.get(1);
+      CompressionStrategy compressionStrategy = 
CompressionStrategy.forId(compression);
+      if (!CompressionStrategy.NONE.equals(compressionStrategy)) {
+        throw DruidException.defensive("compression strategy[%s] not 
supported", compressionStrategy);
+      }
+      int metaLength = headerBuffer.getInt(2);
+
+      byte[] meta = new byte[metaLength];
+      read = fis.read(meta);
+      if (read < meta.length) {
+        throw DruidException.defensive("read[%s] which is less than expected 
metadata length[%s]", read, metaLength);
+      }
+      final int startOffset = header.length + meta.length;
+      final SegmentFileMetadata metadata = mapper.readValue(meta, 
SegmentFileMetadata.class);
+      final List<MappedByteBuffer> containers = 
Lists.newArrayListWithCapacity(metadata.getContainers().size());
+
+      // eagerly map all container buffers so we can ensure they all share the 
same file descriptor without needing to
+      // maintain an open channel (which could be closed during an interrupt 
for example)
+      try (RandomAccessFile f = new RandomAccessFile(segmentFile, "r");

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10610)



##########
processing/src/main/java/org/apache/druid/segment/file/SegmentFileMapperV10.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * {@link SegmentFileMapper} implementation for V10 segment files.
+ * <p>
+ * V10 file format:
+ * | version (byte) | meta compression (byte) | meta length (int) | meta json 
| chunk 0 | chunk 1 | ... | chunk n |
+ */
+public class SegmentFileMapperV10 implements SegmentFileMapper
+{
+  /**
+   * Create a v10 {@link SegmentFileMapper} with 'external' attached v10 
segment files
+   *
+   * @param segmentFile v10 segment file with name {@link 
IndexIO#V10_FILE_NAME}
+   * @param mapper      json mapper to deserialize metadata
+   * @param externals   list of 'external' v10 segment files to attach to this 
mapper and files that can be referenced
+   *                    using {@link #mapExternalFile(String, String)}
+   * @return v10 {@link SegmentFileMapper} using memory mapped {@link 
ByteBuffer}
+   * @throws IOException
+   */
+  public static SegmentFileMapperV10 create(
+      File segmentFile,
+      ObjectMapper mapper,
+      List<String> externals
+  ) throws IOException
+  {
+    final SegmentFileMapperV10 entryPoint = create(segmentFile, mapper);
+
+    final Map<String, SegmentFileMapperV10> externalMappers = new HashMap<>();
+    try {
+      for (String filename : externals) {
+        final File externalFile = new File(segmentFile.getParentFile(), 
filename);
+        if (externalFile.exists()) {
+          externalMappers.put(filename, create(externalFile, mapper));
+        }
+      }
+    }
+    catch (Throwable t) {
+      Closer closer = Closer.create();
+      closer.registerAll(externalMappers.values());
+      throw CloseableUtils.closeAndWrapInCatch(t, closer);
+    }
+
+    return new SegmentFileMapperV10(
+        entryPoint.segmentFile,
+        entryPoint.segmentFileMetadata,
+        entryPoint.containers,
+        externalMappers
+    );
+  }
+
+  /**
+   * Create a v10 {@link SegmentFileMapper}
+   */
+  public static SegmentFileMapperV10 create(
+      File segmentFile,
+      ObjectMapper mapper
+  ) throws IOException
+  {
+    try (FileInputStream fis = new FileInputStream(segmentFile)) {

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10609)



##########
processing/src/main/java/org/apache/druid/segment/SegmentUtils.java:
##########
@@ -79,6 +79,14 @@
       return version;
     }
 
+    final File v10IndexFile = new File(inDir, IndexIO.V10_FILE_NAME);
+    if (v10IndexFile.exists()) {
+      try (InputStream in = new FileInputStream(v10IndexFile)) {

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10608)



##########
processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.primitives.Ints;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.column.ColumnDescriptor;
+import org.apache.druid.segment.data.BitmapSerdeFactory;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.projections.ProjectionMetadata;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * {@link SegmentFileBuilder} for V10 format segments. Right now, this uses a 
{@link FileSmoosher} underneath to build
+ * V9 smoosh files and collect the metadata about the offsets in those 
containers, and then appends them into the V10
+ * consolidated segment file after the header and {@link SegmentFileMetadata} 
is written.
+ * <p>
+ * V10 file format:
+ * | version (byte) | meta compression (byte) | meta length (int) | meta json 
| chunk 0 | chunk 1 | ... | chunk n |
+ */
+public class SegmentFileBuilderV10 implements SegmentFileBuilder
+{
+  private final ObjectMapper jsonMapper;
+  private final String outputFileName;
+  private final File baseDir;
+  private final long maxChunkSize;
+  private final FileSmoosher smoosher;
+  private final Map<String, SegmentFileBuilderV10> externalSegmentFileBuilders;
+  private final Map<String, ColumnDescriptor> columns = new TreeMap<>();
+
+  @Nullable
+  private String interval = null;
+  @Nullable
+  private BitmapSerdeFactory bitmapEncoding = null;
+  @Nullable
+  private List<ProjectionMetadata> projections = null;
+
+  public SegmentFileBuilderV10(
+      ObjectMapper jsonMapper,
+      String outputFileName,
+      File baseDir,
+      long maxChunkSize
+  )
+  {
+    this.jsonMapper = jsonMapper;
+    this.outputFileName = outputFileName;
+    this.baseDir = baseDir;
+    this.maxChunkSize = maxChunkSize;
+    this.smoosher = new FileSmoosher(baseDir, Ints.checkedCast(maxChunkSize), 
outputFileName);
+    this.externalSegmentFileBuilders = new TreeMap<>();
+  }
+
+  @Override
+  public void add(String name, File fileToAdd) throws IOException
+  {
+    smoosher.add(name, fileToAdd);
+  }
+
+  @Override
+  public void add(String name, ByteBuffer bufferToAdd) throws IOException
+  {
+    smoosher.add(name, bufferToAdd);
+  }
+
+  @Override
+  public SegmentFileChannel addWithChannel(String name, long size) throws 
IOException
+  {
+    return smoosher.addWithChannel(name, size);
+  }
+
+  @Override
+  public SegmentFileBuilder getExternalBuilder(String externalFile)
+  {
+    return externalSegmentFileBuilders.computeIfAbsent(
+        externalFile,
+        (k) -> new SegmentFileBuilderV10(jsonMapper, externalFile, baseDir, 
maxChunkSize)
+    );
+  }
+
+  @Override
+  public void addColumn(String name, ColumnDescriptor columnDescriptor)
+  {
+    this.columns.put(name, columnDescriptor);
+  }
+
+  public void addInterval(String interval)
+  {
+    this.interval = interval;
+  }
+
+  public void addBitmapEncoding(BitmapSerdeFactory bitmapEncoding)
+  {
+    this.bitmapEncoding = bitmapEncoding;
+  }
+
+  public void addProjections(List<ProjectionMetadata> projections)
+  {
+    this.projections = projections;
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    for (SegmentFileBuilderV10 externalBuilder : 
externalSegmentFileBuilders.values()) {
+      externalBuilder.close();
+    }
+
+    smoosher.close();
+
+    SegmentFileMetadata segmentFileMetadata = new SegmentFileMetadata(
+        smoosher.getContainers(),
+        smoosher.getInternalFiles(),
+        interval,
+        bitmapEncoding,
+        columns.isEmpty() ? null : columns,
+        projections
+    );
+
+    final byte[] metadataBytes = 
jsonMapper.writeValueAsBytes(segmentFileMetadata);
+
+    final FileOutputStream outputStream = new FileOutputStream(new 
File(baseDir, outputFileName));

Review Comment:
   ## Potential output resource leak
   
   This FileOutputStream is not always closed on method exit.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10611)



##########
processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.column.ColumnDescriptor;
+import org.apache.druid.segment.column.ColumnFormat;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.file.SegmentFileBuilder;
+import org.apache.druid.segment.file.SegmentFileBuilderV10;
+import org.apache.druid.segment.file.SegmentFileChannel;
+import org.apache.druid.segment.loading.SegmentizerFactory;
+import org.apache.druid.segment.projections.ProjectionMetadata;
+import org.apache.druid.segment.projections.Projections;
+import org.apache.druid.segment.serde.NullColumnPartSerde;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.apache.druid.utils.CollectionUtils;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * {@link IndexMerger} for creating v10 format segments with {@link 
SegmentFileBuilderV10}
+ *
+ * @see SegmentFileBuilderV10
+ * @see org.apache.druid.segment.file.SegmentFileMapperV10 to read the 
resulting segment file
+ */
+public class IndexMergerV10 extends IndexMergerBase
+{
+  private static final Logger log = new Logger(IndexMergerV10.class);
+
+  public IndexMergerV10(
+      ObjectMapper mapper,
+      IndexIO indexIO,
+      SegmentWriteOutMediumFactory defaultSegmentWriteOutMediumFactory
+  )
+  {
+    super(mapper, indexIO, defaultSegmentWriteOutMediumFactory);
+  }
+
+  @Override
+  protected boolean shouldStoreEmptyColumns()
+  {
+    return true;
+  }
+
+  @Override
+  protected File makeIndexFiles(
+      final List<IndexableAdapter> adapters,
+      final @Nullable Metadata segmentMetadata,
+      final File outDir,
+      final ProgressIndicator progress,
+      final List<String> mergedDimensionsWithTime, // has both explicit and 
implicit dimensions, as well as __time
+      final DimensionsSpecInspector dimensionsSpecInspector,
+      final List<String> mergedMetrics,
+      final Function<List<TransformableRowIterator>, TimeAndDimsIterator> 
rowMergerFn,
+      final IndexSpec indexSpec,
+      final @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+  ) throws IOException
+  {
+    final String basePrefix = Projections.BASE_TABLE_PROJECTION_NAME + "/";
+
+    progress.start();
+    progress.progress();
+
+    // Merged dimensions without __time.
+    List<String> mergedDimensions =
+        mergedDimensionsWithTime.stream()
+                                .filter(dim -> 
!ColumnHolder.TIME_COLUMN_NAME.equals(dim))
+                                .collect(Collectors.toList());
+    Closer closer = Closer.create();
+    try {
+      final SegmentFileBuilderV10 v10Smoosher = new SegmentFileBuilderV10(
+          mapper,
+          IndexIO.V10_FILE_NAME,
+          outDir,
+          Integer.MAX_VALUE
+      );
+
+      DateTime minTime = DateTimes.MAX;
+      DateTime maxTime = DateTimes.MIN;
+
+      for (IndexableAdapter index : adapters) {
+        minTime = JodaUtils.minDateTime(minTime, 
index.getDataInterval().getStart());
+        maxTime = JodaUtils.maxDateTime(maxTime, 
index.getDataInterval().getEnd());
+      }
+      final Interval dataInterval = new Interval(minTime, maxTime);
+      v10Smoosher.addInterval(dataInterval.toString());
+      v10Smoosher.addBitmapEncoding(indexSpec.getBitmapSerdeFactory());
+
+      FileUtils.mkdirp(outDir);
+
+      SegmentWriteOutMediumFactory omf = segmentWriteOutMediumFactory != null
+                                         ? segmentWriteOutMediumFactory
+                                         : defaultSegmentWriteOutMediumFactory;
+      log.debug("Using SegmentWriteOutMediumFactory[%s]", 
omf.getClass().getSimpleName());
+      SegmentWriteOutMedium segmentWriteOutMedium = 
omf.makeSegmentWriteOutMedium(outDir);
+      closer.register(segmentWriteOutMedium);
+      long startTime = System.currentTimeMillis();
+
+      SegmentizerFactory customSegmentLoader = indexSpec.getSegmentLoader();
+      if (customSegmentLoader != null) {
+        try (FileOutputStream fos = new FileOutputStream(new File(outDir, 
"factory.json"))) {
+          mapper.writeValue(fos, customSegmentLoader);
+          log.debug("Completed factory.json in %,d millis", 
System.currentTimeMillis() - startTime);
+        }
+      }
+
+      progress.progress();
+      // this whole section should also just be writing a projection for the 
base table using the same stuff, however,
+      // the method that builds projections right now is missing the logic to 
mark mergers as 'parents' if any other
+      // projections are relying on them (which keeps certain resources around 
longer so that they are still available
+      // when merging the projection tables), and so it lives separately for 
now... will fix later
+      final Map<String, ColumnFormat> metricFormats = new 
TreeMap<>(Comparators.naturalNullsFirst());
+      final List<ColumnFormat> dimFormats = 
Lists.newArrayListWithCapacity(mergedDimensions.size());
+      mergeFormat(adapters, mergedDimensions, metricFormats, dimFormats);
+
+      final Map<String, DimensionHandler> handlers = 
makeDimensionHandlers(mergedDimensions, dimFormats);
+      final Map<String, DimensionMergerV9> mergersMap = 
Maps.newHashMapWithExpectedSize(mergedDimensions.size());
+      final List<DimensionMergerV9> mergers = new ArrayList<>();
+      for (int i = 0; i < mergedDimensions.size(); i++) {
+        DimensionHandler handler = handlers.get(mergedDimensions.get(i));
+        DimensionMergerV9 merger = handler.makeMerger(
+            basePrefix + mergedDimensions.get(i),
+            indexSpec,
+            segmentWriteOutMedium,
+            dimFormats.get(i).toColumnCapabilities(),
+            progress,
+            outDir,
+            closer
+        );
+        mergers.add(merger);
+        mergersMap.put(mergedDimensions.get(i), merger);
+      }
+
+      // this part right here does the parent marking
+      if (segmentMetadata != null && segmentMetadata.getProjections() != null) 
{
+        for (AggregateProjectionMetadata projectionMetadata : 
segmentMetadata.getProjections()) {
+          for (String dimension : 
projectionMetadata.getSchema().getGroupingColumns()) {
+            DimensionMergerV9 merger = mergersMap.get(dimension);
+            if (merger != null) {
+              merger.markAsParent();
+            }
+          }
+        }
+      }
+
+      /************* Setup Dim Conversions **************/
+      progress.progress();
+      startTime = System.currentTimeMillis();
+      writeDimValuesAndSetupDimConversion(adapters, progress, 
mergedDimensions, mergers);
+      log.debug("Completed dim conversions in %,d millis.", 
System.currentTimeMillis() - startTime);
+
+      /************* Walk through data sets, merge them, and write merged 
columns *************/
+      progress.progress();
+      final TimeAndDimsIterator timeAndDimsIterator = 
makeMergedTimeAndDimsIterator(
+          adapters,
+          mergedDimensionsWithTime,
+          mergedMetrics,
+          rowMergerFn,
+          handlers,
+          mergers
+      );
+      closer.register(timeAndDimsIterator);
+      final GenericColumnSerializer timeWriter = 
setupTimeWriter(segmentWriteOutMedium, indexSpec);
+      final ArrayList<GenericColumnSerializer> metricWriters =
+          setupMetricsWriters(segmentWriteOutMedium, mergedMetrics, 
metricFormats, indexSpec, basePrefix);
+      IndexMergeResult indexMergeResult = mergeIndexesAndWriteColumns(
+          adapters,
+          progress,
+          timeAndDimsIterator,
+          timeWriter,
+          metricWriters,
+          mergers
+      );
+
+      /************ Create Inverted Indexes and Finalize Build Columns 
*************/
+      final String section = "build inverted index and columns";
+      progress.startSection(section);
+      makeTimeColumn(v10Smoosher, progress, timeWriter, indexSpec, basePrefix 
+ ColumnHolder.TIME_COLUMN_NAME);
+      makeMetricsColumns(
+          v10Smoosher,
+          progress,
+          mergedMetrics,
+          metricFormats,
+          metricWriters,
+          indexSpec,
+          basePrefix
+      );
+
+      for (int i = 0; i < mergedDimensions.size(); i++) {
+        DimensionMergerV9 merger = mergers.get(i);
+        merger.writeIndexes(indexMergeResult.rowNumConversions);
+        if (!merger.hasOnlyNulls()) {
+          ColumnDescriptor columnDesc = merger.makeColumnDescriptor();
+          makeColumn(v10Smoosher, basePrefix + mergedDimensions.get(i), 
columnDesc);
+        } else if 
(dimensionsSpecInspector.shouldStore(mergedDimensions.get(i))) {
+          // shouldStore AND hasOnlyNulls
+          ColumnDescriptor columnDesc = ColumnDescriptor
+              .builder()
+              .setValueType(dimFormats.get(i).getLogicalType().getType())
+              .addSerde(new NullColumnPartSerde(indexMergeResult.rowCount, 
indexSpec.getBitmapSerdeFactory()))
+              .build();
+          makeColumn(v10Smoosher, basePrefix + mergedDimensions.get(i), 
columnDesc);
+        }
+      }
+
+      progress.stopSection(section);
+
+      // Recompute the projections.
+      final Metadata finalMetadata;
+      if (segmentMetadata == null || 
CollectionUtils.isNullOrEmpty(segmentMetadata.getProjections())) {
+        finalMetadata = segmentMetadata;
+      } else {
+        finalMetadata = makeProjections(
+            v10Smoosher,
+            segmentMetadata.getProjections(),
+            adapters,
+            indexSpec,
+            segmentWriteOutMedium,
+            progress,
+            outDir,
+            closer,
+            mergersMap,
+            segmentMetadata
+        );
+      }
+
+      List<ProjectionMetadata> projections = new ArrayList<>();
+      // ingestion current builds v9 metadata... translate v9 metadata and 
projection stuff to v10 format
+      projections.add(
+          ProjectionMetadata.forBaseTable(indexMergeResult.rowCount, 
mergedDimensions, finalMetadata)
+      );
+      // convert v9 projections to v10 projections
+      for (AggregateProjectionMetadata aggMeta : 
segmentMetadata.getProjections()) {

Review Comment:
   ## Dereferenced variable may be null
   
   Variable [segmentMetadata](1) may be null at this access as suggested by 
[this](2) null guard.
   Variable [segmentMetadata](1) may be null at this access as suggested by 
[this](3) null guard.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10612)



##########
processing/src/main/java/org/apache/druid/segment/SegmentUtils.java:
##########
@@ -79,6 +79,14 @@
       return version;
     }
 
+    final File v10IndexFile = new File(inDir, IndexIO.V10_FILE_NAME);
+    if (v10IndexFile.exists()) {

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10607)



##########
processing/src/test/java/org/apache/druid/segment/projections/ProjectionMetadataTest.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.projections;
+
+class ProjectionMetadataTest

Review Comment:
   ## Unused classes and interfaces
   
   Unused class: ProjectionMetadataTest is not referenced within this codebase. 
If not used as an external API it should be removed.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10615)



##########
processing/src/main/java/org/apache/druid/segment/projections/AggregateProjectionSchema.java:
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.projections;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.OrderBy;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.segment.CursorBuildSpec;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.utils.CollectionUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class AggregateProjectionSchema implements ProjectionSchema
+{
+  /**
+   * It is not likely the best way to find the best matching projections, but 
it is the one we have for now. This
+   * comparator is used to sort all the projections in a segment "best" first, 
where best is defined as fewest grouping
+   * columns, most virtual columns and aggregators, as an approximation of 
likely to have the fewest number of rows to
+   * scan.
+   */
+  public static final Comparator<AggregateProjectionSchema> COMPARATOR = (o1, 
o2) -> {
+    // coarsest granularity first
+    if 
(o1.getEffectiveGranularity().isFinerThan(o2.getEffectiveGranularity())) {
+      return 1;
+    }
+    if 
(o2.getEffectiveGranularity().isFinerThan(o1.getEffectiveGranularity())) {
+      return -1;
+    }
+    // fewer dimensions first
+    final int dimsCompare = Integer.compare(
+        o1.groupingColumns.size(),
+        o2.groupingColumns.size()
+    );
+    if (dimsCompare != 0) {
+      return dimsCompare;
+    }
+    // more metrics first
+    int metCompare = Integer.compare(o2.aggregators.length, 
o1.aggregators.length);
+    if (metCompare != 0) {
+      return metCompare;
+    }
+    // more virtual columns first
+    final int virtCompare = Integer.compare(
+        o2.virtualColumns.getVirtualColumns().length,
+        o1.virtualColumns.getVirtualColumns().length
+    );
+    if (virtCompare != 0) {
+      return virtCompare;
+    }
+    return o1.name.compareTo(o2.name);
+  };
+
+  private final String name;
+  @Nullable
+  private final String timeColumnName;
+  @Nullable
+  private final DimFilter filter;
+  private final VirtualColumns virtualColumns;
+  private final List<String> groupingColumns;
+  private final AggregatorFactory[] aggregators;
+  private final List<OrderBy> ordering;
+  private final List<OrderBy> orderingWithTimeSubstitution;
+
+  // computed fields
+  private final int timeColumnPosition;
+  private final Granularity effectiveGranularity;
+
+  @JsonCreator
+  public AggregateProjectionSchema(
+      @JsonProperty("name") String name,
+      @JsonProperty("timeColumnName") @Nullable String timeColumnName,
+      @JsonProperty("filter") @Nullable DimFilter filter,
+      @JsonProperty("virtualColumns") @Nullable VirtualColumns virtualColumns,
+      @JsonProperty("groupingColumns") @Nullable List<String> groupingColumns,
+      @JsonProperty("aggregators") @Nullable AggregatorFactory[] aggregators,
+      @JsonProperty("ordering") List<OrderBy> ordering
+  )
+  {
+    if (name == null || name.isEmpty()) {
+      throw DruidException.defensive("projection schema name cannot be null or 
empty");
+    }
+    this.name = name;
+    if (CollectionUtils.isNullOrEmpty(groupingColumns) && (aggregators == null 
|| aggregators.length == 0)) {
+      throw DruidException.defensive(
+          "projection schema[%s] groupingColumns and aggregators must not both 
be null or empty",
+          name
+      );
+    }
+    if (ordering == null) {
+      throw DruidException.defensive("projection schema[%s] ordering must not 
be null", name);
+    }
+    this.filter = filter;
+    this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : 
virtualColumns;
+    this.groupingColumns = groupingColumns == null ? Collections.emptyList() : 
groupingColumns;
+    this.aggregators = aggregators == null ? new AggregatorFactory[0] : 
aggregators;
+    this.ordering = ordering;
+
+    int foundTimePosition = -1;
+    this.orderingWithTimeSubstitution = 
Lists.newArrayListWithCapacity(ordering.size());
+    Granularity granularity = null;
+    for (int i = 0; i < ordering.size(); i++) {
+      OrderBy orderBy = ordering.get(i);
+      if (orderBy.getColumnName().equals(timeColumnName)) {
+        orderingWithTimeSubstitution.add(new 
OrderBy(ColumnHolder.TIME_COLUMN_NAME, orderBy.getOrder()));
+        foundTimePosition = i;
+        timeColumnName = groupingColumns.get(foundTimePosition);

Review Comment:
   ## Dereferenced variable may be null
   
   Variable [groupingColumns](1) may be null at this access as suggested by 
[this](2) null guard.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10613)



##########
processing/src/main/java/org/apache/druid/segment/projections/TableProjectionSchema.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.projections;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.OrderBy;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.AggregateProjectionMetadata;
+import org.apache.druid.segment.Metadata;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.utils.CollectionUtils;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+public class TableProjectionSchema implements BaseTableProjectionSchema
+{
+  public static final String TYPE_NAME = "base-table";
+
+  public static TableProjectionSchema fromMetadata(List<String> dims, Metadata 
metadata)
+  {
+    return new TableProjectionSchema(
+        VirtualColumns.create(
+            Granularities.toVirtualColumn(metadata.getQueryGranularity(), 
Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME)
+        ),
+        dims,
+        metadata.getAggregators(),
+        metadata.getOrdering()
+    );
+  }
+
+  private final VirtualColumns virtualColumns;
+  private final List<String> columnNames;
+  @Nullable
+  private final AggregatorFactory[] aggregators;
+  private final List<OrderBy> ordering;
+
+  // computed fields
+  private final int timeColumnPosition;
+  private final Granularity effectiveGranularity;
+
+  @JsonCreator
+  public TableProjectionSchema(
+      @JsonProperty("virtualColumns") VirtualColumns virtualColumns,
+      @JsonProperty("columns") List<String> columns,
+      @JsonProperty("aggregators") @Nullable AggregatorFactory[] aggregators,
+      @JsonProperty("ordering") List<OrderBy> ordering
+  )
+  {
+    if (CollectionUtils.isNullOrEmpty(columns)) {
+      throw DruidException.defensive("base table projection schema columns 
must not be null or empty");
+    }
+    if (ordering == null) {
+      throw DruidException.defensive("base table projection schema ordering 
must not be null");
+    }
+    this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : 
virtualColumns;
+    this.columnNames = columns;
+    this.aggregators = aggregators == null ? new AggregatorFactory[0] : 
aggregators;
+    this.ordering = ordering;
+
+    int foundTimePosition = -1;
+    Granularity granularity = null;
+    for (int i = 0; i < ordering.size(); i++) {
+      OrderBy orderBy = ordering.get(i);
+      if (orderBy.getColumnName().equals(ColumnHolder.TIME_COLUMN_NAME)) {
+        foundTimePosition = i;
+        // base tables always store granularity virtual column as this name
+        final VirtualColumn vc = 
this.virtualColumns.getVirtualColumn(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME);
+        if (vc != null) {
+          granularity = Granularities.fromVirtualColumn(vc);
+        } else {
+          granularity = Granularities.NONE;
+        }
+      }
+    }
+    if (granularity == null) {
+      throw DruidException.defensive("base table doesn't have a [%s] column?", 
ColumnHolder.TIME_COLUMN_NAME);
+    }
+    this.timeColumnPosition = foundTimePosition;
+    this.effectiveGranularity = granularity;
+  }
+
+  @JsonIgnore
+  @Override
+  public List<String> getColumnNames()
+  {
+    List<String> columns = new ArrayList<>(columnNames.size() + (aggregators 
!= null ? aggregators.length : 0));
+    columns.addAll(columnNames);
+    for (AggregatorFactory aggregator : aggregators) {

Review Comment:
   ## Dereferenced variable may be null
   
   Variable [aggregators](1) may be null at this access as suggested by 
[this](2) null guard.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10614)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to