github-advanced-security[bot] commented on code in PR #17493:
URL: https://github.com/apache/druid/pull/17493#discussion_r1849905567


##########
processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java:
##########
@@ -45,109 +44,181 @@
 {
   private final WriteOutBytes writeOutBytes;
   private final StagedSerde<T> serde;
-  private final IntSerializer intSerializer = new IntSerializer();
+  private final ByteBuffer itemOffsetsBytes;
+  private final IntBuffer itemSizes;
+
+  private final LongArrayList rowChunkOffsets = new LongArrayList();
+  private int numStored = 0;
+  private int maxSize = 0;
 
   public SerializedStorage(WriteOutBytes writeOutBytes, StagedSerde<T> serde)
+  {
+    this(writeOutBytes, serde, 4096);
+  }
+
+  public SerializedStorage(WriteOutBytes writeOutBytes, StagedSerde<T> serde, 
int chunkSize)
   {
     this.writeOutBytes = writeOutBytes;
     this.serde = serde;
+
+    this.itemOffsetsBytes = ByteBuffer.allocate(chunkSize * 
Integer.BYTES).order(ByteOrder.nativeOrder());
+    this.itemSizes = itemOffsetsBytes.asIntBuffer();
   }
 
   public void store(@Nullable T value) throws IOException
   {
     byte[] bytes = serde.serialize(value);
 
-    writeOutBytes.write(intSerializer.serialize(bytes.length));
-    writeOutBytes.write(bytes);
+    maxSize = Math.max(maxSize, bytes.length);
+    itemSizes.put(bytes.length);
+    if (bytes.length > 0) {
+      writeOutBytes.write(bytes);
+    }
+
+    ++numStored;
+    if (itemSizes.remaining() == 0) {
+      rowChunkOffsets.add(writeOutBytes.size());
+      writeOutBytes.write(itemOffsetsBytes);
+      itemOffsetsBytes.clear();
+      itemSizes.clear();
+    }
   }
 
+  public int numStored()
+  {
+    return numStored;
+  }
+
+  /**
+   * Generates an iterator over everything that has been stored.  Also 
signifies the end of storing objects.
+   * iterator() can be called multiple times if needed, but after iterator() 
is called, store() can no longer be
+   * called.
+   *
+   * @return an iterator
+   * @throws IOException on failure
+   */
   public IOIterator<T> iterator() throws IOException
   {
-    return new DeserializingIOIterator<>(writeOutBytes.asInputStream(), serde);
+    if (itemSizes.position() != itemSizes.limit()) {
+      rowChunkOffsets.add(writeOutBytes.size());
+      itemOffsetsBytes.limit(itemSizes.position() * Integer.BYTES);
+      writeOutBytes.write(itemOffsetsBytes);
+
+      // Move the limit to the position so that we fail subsequent writes and 
indicate that we are done
+      itemSizes.limit(itemSizes.position());
+    }
+
+    return new DeserializingIOIterator<>(
+        writeOutBytes,
+        rowChunkOffsets,
+        numStored,
+        itemSizes.capacity(),
+        maxSize,
+        serde
+    );
   }
 
   private static class DeserializingIOIterator<T> implements IOIterator<T>
   {
-    private static final int NEEDS_READ = -2;
-    private static final int EOF = -1;
+    private static final ByteBuffer EMPTY_BUFFER = 
ByteBuffer.allocate(0).asReadOnlyBuffer();
 
-    private final byte[] intBytes;
-    private final BufferedInputStream inputStream;
+    private final WriteOutBytes medium;
+    private final LongArrayList rowChunkOffsets;
+    private final int numEntries;
+    private ByteBuffer tmpBuf;
     private final StagedSerde<T> serde;
 
-    private int nextSize;
-
-    public DeserializingIOIterator(InputStream inputStream, StagedSerde<T> 
serde)
+    private final ByteBuffer itemOffsetsBytes;
+    private final int[] itemSizes;
+
+    private long itemStartOffset;
+    private int chunkId = 0;
+    private int currId = 0;
+    private int itemIndex;
+
+    public DeserializingIOIterator(
+        WriteOutBytes medium,
+        LongArrayList rowChunkOffsets,
+        int numEntries,
+        int chunkSize,
+        int maxSize,
+        StagedSerde<T> serde
+    )
     {
-      this.inputStream = new BufferedInputStream(inputStream);
+      this.medium = medium;
+      this.rowChunkOffsets = rowChunkOffsets;
+      this.numEntries = numEntries;
+      this.tmpBuf = 
ByteBuffer.allocate(maxSize).order(ByteOrder.nativeOrder());
       this.serde = serde;
-      intBytes = new byte[Integer.BYTES];
-      nextSize = NEEDS_READ;
+
+      this.itemOffsetsBytes = ByteBuffer.allocate(chunkSize * 
Integer.BYTES).order(ByteOrder.nativeOrder());
+      this.itemSizes = new int[chunkSize];
+      this.itemIndex = chunkSize;
     }
 
     @Override
-    public boolean hasNext() throws IOException
+    public boolean hasNext()
     {
-      return getNextSize() > EOF;
+      return currId < numEntries;
     }
 
     @Override
     public T next() throws IOException
     {
-      int currentNextSize = getNextSize();
-
-      if (currentNextSize == -1) {
-        throw new NoSuchElementException("end of buffer reached");
+      if (currId >= numEntries) {
+        throw new NoSuchElementException();
       }
 
-      byte[] nextBytes = new byte[currentNextSize];
-      int bytesRead = 0;
-
-      while (bytesRead < currentNextSize) {
-        int result = inputStream.read(nextBytes, bytesRead, currentNextSize - 
bytesRead);
-
-        if (result == -1) {
-          throw new NoSuchElementException("unexpected end of buffer reached");
+      if (itemIndex >= itemSizes.length) {
+        if (chunkId == 0) {
+          itemStartOffset = 0;
+        } else {
+          if (itemStartOffset != rowChunkOffsets.getLong(chunkId - 1)) {
+            throw DruidException.defensive(
+                "Should have read up to the start of the offsets [,d], "
+                + "but for some reason the values [%,d] don't align.  Possible 
corruption?",
+                rowChunkOffsets.getLong(chunkId - 1),
+                itemStartOffset
+            );

Review Comment:
   ## Unused format argument
   
   This format call refers to 1 argument(s) but supplies 2 argument(s).
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/8469)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to