clintropolis commented on code in PR #18909:
URL: https://github.com/apache/druid/pull/18909#discussion_r2684206202


##########
processing/src/main/java/org/apache/druid/frame/file/FrameFileWriter.java:
##########
@@ -95,18 +107,70 @@ public static FrameFileWriter open(
         compressionBuffer,
         AppendableMemory.create(allocator),
         AppendableMemory.create(allocator),
-        byteTracker
+        byteTracker,
+        wireTransferableContext
     );
   }
 
   /**
-   * Write a frame.
+   * Write a batch of data to the file. If legacy frame serialization is 
enabled and the RowsAndColumns can be
+   * converted to a Frame, it will be written as raw frame bytes with {@link 
#MARKER_FRAME}. Otherwise, it will be
+   * written using {@link WireTransferable} with {@link #MARKER_RAC}.
    *
-   * @param frame     the frame
+   * @param rac       the RowsAndColumns to write
    * @param partition partition number for a partitioned frame file, or {@link 
#NO_PARTITION} for an unpartitioned file.
    *                  Must be monotonically increasing.
    */
-  public void writeFrame(final Frame frame, final int partition) throws 
IOException
+  public void writeRAC(final RowsAndColumns rac, final int partition) throws 
IOException

Review Comment:
   any reason not to just call this `write`?



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.semantic;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.guice.DruidSecondaryModule;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+/**
+ * A Semantic interface that enables serializing a {@link RowsAndColumns} over 
the wire.
+ * <p>
+ * Serialization and Deserialization are a weird beast.  It is very easy to 
follow an Object-Oriented mechanism for
+ * serializing out an object, but when you want to deserialize, it is unclear 
exactly which class should be
+ * deserialized.  This requires extra metadata that helps point the 
deserializing code to how to interpret the
+ * bytes.
+ * <p>
+ * The {@link WireTransferable} works by serializing the object as a singular 
byte array, the first byte of the byte array
+ * is interpretted as an unsigned integer (N) and represents the length of the 
"extra metadata" to identify the correct
+ * deserializer.  Those N bytes are read as a utf8 String and used to look up 
the deserialization logic.  This
+ * identifier must be registered with {@link 
DruidSecondaryModule#getWireTransferableDeserializerBinder}, which will
+ * use the identifier to look up the logic and deserialize the RowsAndColumns 
object.
+ * <p>
+ * There is, intentionally, no default implementation of {@link 
WireTransferable} that just wraps a RowsAndColumns,
+ * this is because the point at which data is transfered over the wire tends 
to be a very sensitive point in terms of
+ * what the best format for transmission is and what optimizations are 
available.  As such, we don't want a concrete
+ * class that makes its way to the wire to accidentally get a 
least-common-denominator serialization and instead want
+ * it to be explicitly chosen.
+ */
+public interface WireTransferable
+{
+  /**
+   * Serializes the object into bytes.  This method is passed a jackson 
ObjectMapper that can be used to serialize
+   * objects if needed.  In general, it's better to generate binary 
serializations and not rely on the mapper
+   * but it's there as an expediency layer to support some compatibility 
stuff.  Hopefully it goes away at some point.
+   *
+   * @param mapper ObjectMapper
+   * @return bytes that represent the wire-format of this object
+   * @throws IOException a problem with IO
+   */
+  ByteArrayOffsetAndLen serializedBytes(ObjectMapper mapper) throws 
IOException;
+
+  interface Deserializer
+  {
+    /**
+     * Deserializes a ByteBuffer into a RowsAndColumns. The deserialized 
object owns the passed-in buffer as well
+     * as the data it points to. It is safe for the deserializer to mutate the 
buffer, retain a reference to it, etc.
+     *
+     * @param mapper ObjectMapper
+     * @param theBytes ByteBuffer to deserialize, in little-endian order
+     * @return RowsAndColumns object deserialized from the ByteBuffer
+     */
+    RowsAndColumns deserialize(ObjectMapper mapper, ByteBuffer theBytes);
+  }
+
+  /**
+   * A holder class for a byte array, offset and length, this exists to 
minimize copies.  It's pretty similar to a
+   * ByteBuffer, but is separate because jackson cannot write a ByteBuffer 
anyway so might as well wrap in this.
+   */
+  class ByteArrayOffsetAndLen
+  {
+    private final byte[] array;
+    private final int offset;
+    private final int length;
+
+    public ByteArrayOffsetAndLen(byte[] array, int offset, int length)

Review Comment:
   what is the use case for `offset`? Afaict it seems like it always set to 0, 
but trying to wrap my head around what it would mean/how it would be used if it 
was set to non-zero, like i guess packing multiple `WireTransferrable` into the 
same blob?



##########
processing/src/main/java/org/apache/druid/frame/channel/ReadableFrameChannel.java:
##########
@@ -20,54 +20,80 @@
 package org.apache.druid.frame.channel;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.frame.Frame;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
 
 import java.io.Closeable;
+import java.util.NoSuchElementException;
 
 /**
- * Interface for reading a sequence of frames. Supports nonblocking reads 
through the {@link #canRead()} and
+ * Interface for reading a sequence of batches of data. Supports nonblocking 
reads through the {@link #canRead()} and
  * {@link #readabilityFuture()} methods.
  *
  * May be implemented using an in-memory queue, disk file, stream, etc.
  *
  * Channels implementing this interface are used by a single reader; they do 
not support concurrent reads.
+ *
+ * Despite its name, instances of this class can typically return any {@link 
RowsAndColumns} through the
+ * {@link #readRAC()} method.
  */
 public interface ReadableFrameChannel extends Closeable
 {
   /**
-   * Returns whether this channel is finished. Finished channels will not 
generate any further frames or errors.
+   * Returns whether this channel is finished. Finished channels will not 
generate any further data batches or errors.
    *
    * Generally, once you discover that a channel is finished, you should call 
{@link #close()} and then
    * discard it.
    *
-   * Note that it is possible for a channel to be unfinished and also have no 
available frames or errors. This happens
-   * when it is not in a ready-for-reading state. See {@link 
#readabilityFuture()} for details.
+   * Note that it is possible for a channel to be unfinished and also have no 
available data batches or errors.
+   * This happens when it is not in a ready-for-reading state. See {@link 
#readabilityFuture()} for details.
    */
   boolean isFinished();
 
   /**
-   * Returns whether this channel has a frame or error condition currently 
available. If this method returns true, then
-   * you can call {@link #read()} to retrieve the frame or error.
+   * Returns whether this channel has a batch of data or error condition 
currently available. If this method returns
+   * true, then you can call {@link #read()} or {@link #readRAC()} to retrieve 
the batch or error.
    *
-   * Note that it is possible for a channel to be unfinished and also have no 
available frames or errors. This happens
+   * Note that it is possible for a channel to be unfinished and also have no 
available batches or errors. This happens
    * when it is not in a ready-for-reading state. See {@link 
#readabilityFuture()} for details.
    */
   boolean canRead();
 
   /**
-   * Returns the next available frame from this channel.
+   * Returns the next available batch of data from this channel as a {@link 
Frame}.
+   *
+   * Before calling this method, you should check {@link #canRead()} to ensure 
there is a batch of data or
+   * error available.
+   *
+   * @throws NoSuchElementException if there is no batch currently available
+   * @throws DruidException         if the available batch was not available 
as a {@link Frame}
+   */
+  default Frame read()
+  {
+    final RowsAndColumns rac = readRAC();
+    final Frame frame = rac.as(Frame.class);
+    if (frame != null) {
+      return frame;
+    } else {
+      throw DruidException.defensive("Got RAC[%s] which is not a frame", rac);
+    }
+  }
+
+  /**
+   * Returns the next available batch of data from this channel as a {@link 
RowsAndColumns}.
    *
-   * Before calling this method, you should check {@link #canRead()} to ensure 
there is a frame or
+   * Before calling this method, you should check {@link #canRead()} to ensure 
there is a batch of data or
    * error available.
    *
-   * @throws java.util.NoSuchElementException if there is no frame currently 
available
+   * @throws NoSuchElementException if there is no batch currently available
    */
-  Frame read();
+  RowsAndColumns readRAC();

Review Comment:
   it seems like this method should be called `read` and the other should be 
`readAsFrame` or `readFrame`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to