the-other-tim-brown commented on code in PR #18098:
URL: https://github.com/apache/hudi/pull/18098#discussion_r2957154664


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala:
##########
@@ -0,0 +1,732 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.blob
+
+import org.apache.hudi.HoodieSparkUtils.sparkAdapter
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
+import org.apache.hudi.io.SeekableDataInputStream
+import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, 
StorageConfiguration, StoragePath}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Encoders, Row}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, 
SpecificInternalRow}
+import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Batched byte range reader that optimizes I/O by combining consecutive reads 
for out-of-line data.
+ *
+ * This reader analyzes sequences of read requests within a partition and 
merges
+ * consecutive or nearby reads into single I/O operations. This significantly 
reduces
+ * the number of seeks and reads when processing sorted data.
+ *
+ * <h3>Schema Requirement:</h3>
+ * The blob column must match the schema defined in {@link HoodieSchema.Blob}:
+ * <pre>
+ * struct {
+ *   type: string                   // "inline" or "out_of_line"
+ *   data: binary (nullable)       // inline data (null for out_of_line)
+ *   reference: struct (nullable) { // file reference (null for inline)
+ *     external_path: string
+ *     offset: long
+ *     length: long
+ *     managed: boolean
+ *   }
+ * }
+ * </pre>
+ *
+ * <h3>Key Features:</h3>
+ * <ul>
+ *   <li>Batches consecutive reads from the same file</li>
+ *   <li>Configurable gap threshold for merging nearby reads</li>
+ *   <li>Lookahead buffer to identify batch opportunities</li>
+ *   <li>Preserves input row order in output</li>
+ * </ul>
+ *
+ * <h3>Usage Example:</h3>
+ * {{{
+ * import org.apache.hudi.udf.BatchedByteRangeReader
+ * import org.apache.spark.sql.functions._
+ * // Read a table with a blob column (e.g. image_data)
+ * val df = spark.read.format("hudi").load("/my_path").select("image_data", 
"record_id")
+ *
+ * // Read with batching (best when data is sorted by external_path, offset)
+ * val result = BatchedByteRangeReader.readBatched(df, structColName = 
"file_info")
+ *
+ * // Result has: image_data, record_id, data
+ * result.show()
+ * }}}
+ *
+ * <h3>Performance Tips:</h3>
+ * <ul>
+ *   <li>Sort input by (blob.reference.external_path, blob.reference.offset) 
for maximum batching effectiveness</li>
+ *   <li>Increase lookaheadSize for better batch detection (at cost of 
memory)</li>
+ *   <li>Tune maxGapBytes based on your data access patterns</li>
+ * </ul>
+ *
+ * @param storage        HoodieStorage instance for file I/O
+ * @param maxGapBytes    Maximum gap between ranges to consider for batching 
(default: 4KB)
+ * @param lookaheadSize  Number of rows to buffer for batch detection 
(default: 50)
+ */
+class BatchedBlobReader(
+    storage: HoodieStorage,
+    maxGapBytes: Int = 4096,
+    lookaheadSize: Int = 50) {
+
+  private val logger = LoggerFactory.getLogger(classOf[BatchedBlobReader])
+
+  /**
+   * Process a partition iterator, batching consecutive reads.
+   *
+   * This method consumes the input iterator and produces an output iterator
+   * with each row containing the original data plus a "data" column with the
+   * bytes read from the file.
+   *
+   * @param rows         Iterator of input rows with struct column
+   * @param structColIdx Index of the struct column in the row
+   * @param outputSchema Schema for output rows
+   * @param accessor     Type class for accessing row fields
+   * @param builder      Type class for building output rows
+   * @tparam R           Row type (Row or InternalRow)
+   * @return Iterator of output rows with data column added
+   */
+  def processPartition[R](
+      rows: Iterator[R],
+      structColIdx: Int,
+      outputSchema: StructType)
+      (implicit accessor: RowAccessor[R], builder: RowBuilder[R]): Iterator[R] 
= {
+
+    // Create buffered iterator for lookahead
+    val bufferedRows = rows.buffered
+
+    // Result buffer to maintain order
+    val resultIterator = new Iterator[R] {
+      private var currentBatch: Iterator[R] = Iterator.empty
+      private var rowIndex = 0L
+
+      override def hasNext: Boolean = {
+        if (currentBatch.hasNext) {
+          true
+        } else if (bufferedRows.hasNext) {
+          // Process next batch
+          currentBatch = processNextBatch()
+          currentBatch.hasNext
+        } else {
+          false
+        }
+      }
+
+      override def next(): R = {
+        if (!hasNext) {
+          throw new NoSuchElementException("No more rows")
+        }
+        currentBatch.next()
+      }
+
+      /**
+       * Collect and process the next batch of rows.
+       */
+      private def processNextBatch(): Iterator[R] = {
+        // Collect up to lookaheadSize rows with their original indices
+        val batch = collectBatch()
+
+        if (batch.isEmpty) {
+          Iterator.empty
+        } else {
+          // Partition the batch into three groups
+          val (inlineRows, outOfLineRows) = 
batch.partition(_.inlineBytes.isDefined)
+          val (wholeFileRows, rangeRows) = outOfLineRows.partition(_.length < 
0)
+
+          // Case 1: Inline — return bytes directly without I/O
+          val inlineResults = inlineRows.map { ri =>

Review Comment:
   Right but I think it's easy enough to add reader support in this task so 
might as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to