This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 7c465c4fd chore: Remove all remaining uses of legacy BatchReader from
Comet [iceberg] (#3468)
7c465c4fd is described below
commit 7c465c4fdc2d3d125d559e82d5abc6c51d5fedd4
Author: Andy Grove <[email protected]>
AuthorDate: Wed Feb 25 09:41:41 2026 -0700
chore: Remove all remaining uses of legacy BatchReader from Comet [iceberg]
(#3468)
---
.../java/org/apache/comet/parquet/BatchReader.java | 131 +-----------
.../comet/parquet/IcebergCometBatchReader.java | 2 +
.../main/scala/org/apache/comet/CometConf.scala | 17 --
.../comet/parquet/CometReaderThreadPool.scala | 5 -
.../comet/parquet/CometParquetFileFormat.scala | 119 ++++-------
.../CometParquetPartitionReaderFactory.scala | 233 ---------------------
.../apache/comet/parquet/CometParquetScan.scala | 94 ---------
.../rules/EliminateRedundantTransitions.scala | 5 +-
.../spark/sql/comet/CometNativeScanExec.scala | 4 +-
.../org/apache/spark/sql/comet/CometScanExec.scala | 49 +----
.../apache/comet/parquet/ParquetReadSuite.scala | 208 +-----------------
.../apache/comet/rules/CometScanRuleSuite.scala | 6 -
.../spark/sql/benchmark/CometReadBenchmark.scala | 24 ---
13 files changed, 56 insertions(+), 841 deletions(-)
diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java
b/common/src/main/java/org/apache/comet/parquet/BatchReader.java
index d59145459..5a0bc9f6d 100644
--- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java
@@ -24,10 +24,6 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
import scala.Option;
@@ -36,9 +32,7 @@ import org.slf4j.LoggerFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -87,7 +81,10 @@ import org.apache.comet.vector.CometVector;
* reader.close();
* }
* </pre>
+ *
+ * @deprecated since 0.14.0. This class is kept for Iceberg compatibility only.
*/
+@Deprecated
@IcebergApi
public class BatchReader extends RecordReader<Void, ColumnarBatch> implements
Closeable {
private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);
@@ -110,8 +107,6 @@ public class BatchReader extends RecordReader<Void,
ColumnarBatch> implements Cl
protected AbstractColumnReader[] columnReaders;
private CometSchemaImporter importer;
protected ColumnarBatch currentBatch;
- private Future<Option<Throwable>> prefetchTask;
- private LinkedBlockingQueue<Pair<PageReadStore, Long>> prefetchQueue;
private FileReader fileReader;
private boolean[] missingColumns;
protected boolean isInitialized;
@@ -363,26 +358,7 @@ public class BatchReader extends RecordReader<Void,
ColumnarBatch> implements Cl
}
}
- // Pre-fetching
- boolean preFetchEnabled =
- conf.getBoolean(
- CometConf.COMET_SCAN_PREFETCH_ENABLED().key(),
- (boolean)
CometConf.COMET_SCAN_PREFETCH_ENABLED().defaultValue().get());
-
- if (preFetchEnabled) {
- LOG.info("Prefetch enabled for BatchReader.");
- this.prefetchQueue = new LinkedBlockingQueue<>();
- }
-
isInitialized = true;
- synchronized (this) {
- // if prefetch is enabled, `init()` is called in separate thread. When
- // `BatchReader.nextBatch()` is called asynchronously, it is possibly
that
- // `init()` is not called or finished. We need to hold on `nextBatch`
until
- // initialization of `BatchReader` is done. Once we are close to finish
- // initialization, we notify the waiting thread of `nextBatch` to
continue.
- notifyAll();
- }
}
/**
@@ -436,51 +412,13 @@ public class BatchReader extends RecordReader<Void,
ColumnarBatch> implements Cl
return currentBatch;
}
- // Only for testing
- public Future<Option<Throwable>> getPrefetchTask() {
- return this.prefetchTask;
- }
-
- // Only for testing
- public LinkedBlockingQueue<Pair<PageReadStore, Long>> getPrefetchQueue() {
- return this.prefetchQueue;
- }
-
/**
* Loads the next batch of rows.
*
* @return true if there are no more rows to read, false otherwise.
*/
public boolean nextBatch() throws IOException {
- if (this.prefetchTask == null) {
- Preconditions.checkState(isInitialized, "init() should be called
first!");
- } else {
- // If prefetch is enabled, this reader will be initialized
asynchronously from a
- // different thread. Wait until it is initialized
- while (!isInitialized) {
- synchronized (this) {
- try {
- // Wait until initialization of current `BatchReader` is finished
(i.e., `init()`),
- // is done. It is possibly that `init()` is done after entering
this while loop,
- // so a short timeout is given.
- wait(100);
-
- // Checks if prefetch task is finished. If so, tries to get
exception if any.
- if (prefetchTask.isDone()) {
- Option<Throwable> exception = prefetchTask.get();
- if (exception.isDefined()) {
- throw exception.get();
- }
- }
- } catch (RuntimeException e) {
- // Spark will check certain exception e.g.
`SchemaColumnConvertNotSupportedException`.
- throw e;
- } catch (Throwable e) {
- throw new IOException(e);
- }
- }
- }
- }
+ Preconditions.checkState(isInitialized, "init() should be called first!");
if (rowsRead >= totalRowCount) return false;
boolean hasMore;
@@ -547,7 +485,6 @@ public class BatchReader extends RecordReader<Void,
ColumnarBatch> implements Cl
}
}
- @SuppressWarnings("deprecation")
private boolean loadNextRowGroupIfNecessary() throws Throwable {
// More rows can be read from loaded row group. No need to load next one.
if (rowsRead != totalRowsLoaded) return true;
@@ -556,21 +493,7 @@ public class BatchReader extends RecordReader<Void,
ColumnarBatch> implements Cl
SQLMetric numRowGroupsMetric = metrics.get("ParquetRowGroups");
long startNs = System.nanoTime();
- PageReadStore rowGroupReader = null;
- if (prefetchTask != null && prefetchQueue != null) {
- // Wait for pre-fetch task to finish.
- Pair<PageReadStore, Long> rowGroupReaderPair = prefetchQueue.take();
- rowGroupReader = rowGroupReaderPair.getLeft();
-
- // Update incremental byte read metric. Because this metric in Spark is
maintained
- // by thread local variable, we need to manually update it.
- // TODO: We may expose metrics from `FileReader` and get from it
directly.
- long incBytesRead = rowGroupReaderPair.getRight();
- FileSystem.getAllStatistics().stream()
- .forEach(statistic -> statistic.incrementBytesRead(incBytesRead));
- } else {
- rowGroupReader = fileReader.readNextRowGroup();
- }
+ PageReadStore rowGroupReader = fileReader.readNextRowGroup();
if (rowGroupTimeMetric != null) {
rowGroupTimeMetric.add(System.nanoTime() - startNs);
@@ -608,48 +531,4 @@ public class BatchReader extends RecordReader<Void,
ColumnarBatch> implements Cl
totalRowsLoaded += rowGroupReader.getRowCount();
return true;
}
-
- // Submits a prefetch task for this reader.
- public void submitPrefetchTask(ExecutorService threadPool) {
- this.prefetchTask = threadPool.submit(new PrefetchTask());
- }
-
- // A task for prefetching parquet row groups.
- private class PrefetchTask implements Callable<Option<Throwable>> {
- private long getBytesRead() {
- return FileSystem.getAllStatistics().stream()
- .mapToLong(s -> s.getThreadStatistics().getBytesRead())
- .sum();
- }
-
- @Override
- public Option<Throwable> call() throws Exception {
- // Gets the bytes read so far.
- long baseline = getBytesRead();
-
- try {
- init();
-
- while (true) {
- PageReadStore rowGroupReader = fileReader.readNextRowGroup();
-
- if (rowGroupReader == null) {
- // Reaches the end of row groups.
- return Option.empty();
- } else {
- long incBytesRead = getBytesRead() - baseline;
-
- prefetchQueue.add(Pair.of(rowGroupReader, incBytesRead));
- }
- }
- } catch (Throwable e) {
- // Returns exception thrown from the reader. The reader will re-throw
it.
- return Option.apply(e);
- } finally {
- if (fileReader != null) {
- fileReader.closeStream();
- }
- }
- }
- }
}
diff --git
a/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java
b/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java
index d4bfa2b87..bd66f2dea 100644
--- a/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java
@@ -24,9 +24,11 @@ import java.util.HashMap;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.comet.IcebergApi;
import org.apache.comet.vector.CometVector;
/** This class is a public interface used by Apache Iceberg to read batches
using Comet */
+@IcebergApi
public class IcebergCometBatchReader extends BatchReader {
public IcebergCometBatchReader(int numColumns, StructType schema) {
this.columnReaders = new AbstractColumnReader[numColumns];
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 5ee777f3d..41b69952a 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -675,23 +675,6 @@ object CometConf extends ShimCometConf {
.doubleConf
.createWithDefault(1.0)
- val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] =
- conf("spark.comet.scan.preFetch.enabled")
- .category(CATEGORY_SCAN)
- .doc("Whether to enable pre-fetching feature of CometScan.")
- .booleanConf
- .createWithDefault(false)
-
- val COMET_SCAN_PREFETCH_THREAD_NUM: ConfigEntry[Int] =
- conf("spark.comet.scan.preFetch.threadNum")
- .category(CATEGORY_SCAN)
- .doc(
- "The number of threads running pre-fetching for CometScan. Effective
if " +
- s"${COMET_SCAN_PREFETCH_ENABLED.key} is enabled. Note that more " +
- "pre-fetching threads means more memory requirement to store
pre-fetched row groups.")
- .intConf
- .createWithDefault(2)
-
val COMET_NATIVE_LOAD_REQUIRED: ConfigEntry[Boolean] =
conf("spark.comet.nativeLoadRequired")
.category(CATEGORY_EXEC)
.doc(
diff --git
a/common/src/main/scala/org/apache/comet/parquet/CometReaderThreadPool.scala
b/common/src/main/scala/org/apache/comet/parquet/CometReaderThreadPool.scala
index ca13bba0c..1759ea276 100644
--- a/common/src/main/scala/org/apache/comet/parquet/CometReaderThreadPool.scala
+++ b/common/src/main/scala/org/apache/comet/parquet/CometReaderThreadPool.scala
@@ -54,11 +54,6 @@ abstract class CometReaderThreadPool {
}
-// A thread pool used for pre-fetching files.
-object CometPrefetchThreadPool extends CometReaderThreadPool {
- override def threadNamePrefix: String = "prefetch_thread"
-}
-
// Thread pool used by the Parquet parallel reader
object CometFileReaderThreadPool extends CometReaderThreadPool {
override def threadNamePrefix: String = "file_reader_thread"
diff --git
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
index e07d16d4d..7874f3774 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
@@ -57,7 +57,7 @@ import org.apache.comet.vector.CometVector
* in [[org.apache.comet.CometSparkSessionExtensions]]
* - `buildReaderWithPartitionValues`, so Spark calls Comet's Parquet reader
to read values.
*/
-class CometParquetFileFormat(session: SparkSession, scanImpl: String)
+class CometParquetFileFormat(session: SparkSession)
extends ParquetFileFormat
with MetricsSupport
with ShimSQLConf {
@@ -110,8 +110,6 @@ class CometParquetFileFormat(session: SparkSession,
scanImpl: String)
// Comet specific configurations
val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)
- val nativeIcebergCompat = scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT
-
(file: PartitionedFile) => {
val sharedConf = broadcastedHadoopConf.value.value
val footer = FooterReader.readFooter(sharedConf, file)
@@ -135,85 +133,42 @@ class CometParquetFileFormat(session: SparkSession,
scanImpl: String)
isCaseSensitive,
datetimeRebaseSpec)
- val recordBatchReader =
- if (nativeIcebergCompat) {
- // We still need the predicate in the conf to allow us to generate
row indexes based on
- // the actual row groups read
- val pushed = if (parquetFilterPushDown) {
- filters
- // Collects all converted Parquet filter predicates. Notice that
not all predicates
- // can be converted (`ParquetFilters.createFilter` returns an
`Option`). That's why
- // a `flatMap` is used here.
- .flatMap(parquetFilters.createFilter)
- .reduceOption(FilterApi.and)
- } else {
- None
- }
- pushed.foreach(p =>
ParquetInputFormat.setFilterPredicate(sharedConf, p))
- val pushedNative = if (parquetFilterPushDown) {
- parquetFilters.createNativeFilters(filters)
- } else {
- None
- }
- val batchReader = new NativeBatchReader(
- sharedConf,
- file,
- footer,
- pushedNative.orNull,
- capacity,
- requiredSchema,
- dataSchema,
- isCaseSensitive,
- useFieldId,
- ignoreMissingIds,
- datetimeRebaseSpec.mode == CORRECTED,
- partitionSchema,
- file.partitionValues,
- metrics.asJava,
- CometMetricNode(metrics))
- try {
- batchReader.init()
- } catch {
- case e: Throwable =>
- batchReader.close()
- throw e
- }
- batchReader
- } else {
- val pushed = if (parquetFilterPushDown) {
- filters
- // Collects all converted Parquet filter predicates. Notice that
not all predicates
- // can be converted (`ParquetFilters.createFilter` returns an
`Option`). That's why
- // a `flatMap` is used here.
- .flatMap(parquetFilters.createFilter)
- .reduceOption(FilterApi.and)
- } else {
- None
- }
- pushed.foreach(p =>
ParquetInputFormat.setFilterPredicate(sharedConf, p))
-
- val batchReader = new BatchReader(
- sharedConf,
- file,
- footer,
- capacity,
- requiredSchema,
- isCaseSensitive,
- useFieldId,
- ignoreMissingIds,
- datetimeRebaseSpec.mode == CORRECTED,
- partitionSchema,
- file.partitionValues,
- metrics.asJava)
- try {
- batchReader.init()
- } catch {
- case e: Throwable =>
- batchReader.close()
- throw e
- }
- batchReader
- }
+ val pushed = if (parquetFilterPushDown) {
+ filters
+ .flatMap(parquetFilters.createFilter)
+ .reduceOption(FilterApi.and)
+ } else {
+ None
+ }
+ pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))
+ val pushedNative = if (parquetFilterPushDown) {
+ parquetFilters.createNativeFilters(filters)
+ } else {
+ None
+ }
+ val recordBatchReader = new NativeBatchReader(
+ sharedConf,
+ file,
+ footer,
+ pushedNative.orNull,
+ capacity,
+ requiredSchema,
+ dataSchema,
+ isCaseSensitive,
+ useFieldId,
+ ignoreMissingIds,
+ datetimeRebaseSpec.mode == CORRECTED,
+ partitionSchema,
+ file.partitionValues,
+ metrics.asJava,
+ CometMetricNode(metrics))
+ try {
+ recordBatchReader.init()
+ } catch {
+ case e: Throwable =>
+ recordBatchReader.close()
+ throw e
+ }
val iter = new RecordReaderIterator(recordBatchReader)
try {
iter.asInstanceOf[Iterator[InternalRow]]
diff --git
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
deleted file mode 100644
index 495054fc8..000000000
---
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.parquet
-
-import scala.collection.mutable
-import scala.jdk.CollectionConverters._
-
-import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate}
-import org.apache.parquet.hadoop.ParquetInputFormat
-import org.apache.parquet.hadoop.metadata.ParquetMetadata
-import org.apache.spark.TaskContext
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
-import org.apache.spark.sql.connector.read.InputPartition
-import org.apache.spark.sql.connector.read.PartitionReader
-import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionedFile}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
-import org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory
-import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SerializableConfiguration
-
-import org.apache.comet.{CometConf, CometRuntimeException}
-import org.apache.comet.shims.ShimSQLConf
-
-case class CometParquetPartitionReaderFactory(
- usingDataFusionReader: Boolean,
- @transient sqlConf: SQLConf,
- broadcastedConf: Broadcast[SerializableConfiguration],
- readDataSchema: StructType,
- partitionSchema: StructType,
- filters: Array[Filter],
- options: ParquetOptions,
- metrics: Map[String, SQLMetric])
- extends FilePartitionReaderFactory
- with ShimSQLConf
- with Logging {
-
- private val isCaseSensitive = sqlConf.caseSensitiveAnalysis
- private val useFieldId = CometParquetUtils.readFieldId(sqlConf)
- private val ignoreMissingIds = CometParquetUtils.ignoreMissingIds(sqlConf)
- private val pushDownDate = sqlConf.parquetFilterPushDownDate
- private val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
- private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
- private val pushDownStringPredicate =
sqlConf.parquetFilterPushDownStringPredicate
- private val pushDownInFilterThreshold =
sqlConf.parquetFilterPushDownInFilterThreshold
- private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead
- private val parquetFilterPushDown = sqlConf.parquetFilterPushDown
-
- // Comet specific configurations
- private val batchSize = CometConf.COMET_BATCH_SIZE.get(sqlConf)
-
- // This is only called at executor on a Broadcast variable, so we don't want
it to be
- // materialized at driver.
- @transient private lazy val preFetchEnabled = {
- val conf = broadcastedConf.value.value
-
- conf.getBoolean(
- CometConf.COMET_SCAN_PREFETCH_ENABLED.key,
- CometConf.COMET_SCAN_PREFETCH_ENABLED.defaultValue.get) &&
- !usingDataFusionReader // Turn off prefetch if native_iceberg_compat is
enabled
- }
-
- private var cometReaders: Iterator[BatchReader] = _
- private val cometReaderExceptionMap = new mutable.HashMap[PartitionedFile,
Throwable]()
-
- // TODO: we may want to revisit this as we're going to only support flat
types at the beginning
- override def supportColumnarReads(partition: InputPartition): Boolean = true
-
- override def createColumnarReader(partition: InputPartition):
PartitionReader[ColumnarBatch] = {
- if (preFetchEnabled) {
- val filePartition = partition.asInstanceOf[FilePartition]
- val conf = broadcastedConf.value.value
-
- val threadNum = conf.getInt(
- CometConf.COMET_SCAN_PREFETCH_THREAD_NUM.key,
- CometConf.COMET_SCAN_PREFETCH_THREAD_NUM.defaultValue.get)
- val prefetchThreadPool =
CometPrefetchThreadPool.getOrCreateThreadPool(threadNum)
-
- this.cometReaders = filePartition.files
- .map { file =>
- // `init()` call is deferred to when the prefetch task begins.
- // Otherwise we will hold too many resources for readers which are
not ready
- // to prefetch.
- val cometReader = buildCometReader(file)
- if (cometReader != null) {
- cometReader.submitPrefetchTask(prefetchThreadPool)
- }
-
- cometReader
- }
- .toSeq
- .toIterator
- }
-
- super.createColumnarReader(partition)
- }
-
- override def buildReader(partitionedFile: PartitionedFile):
PartitionReader[InternalRow] =
- throw new UnsupportedOperationException("Comet doesn't support
'buildReader'")
-
- private def buildCometReader(file: PartitionedFile): BatchReader = {
- val conf = broadcastedConf.value.value
-
- try {
- val (datetimeRebaseSpec, footer, filters) = getFilter(file)
- filters.foreach(pushed => ParquetInputFormat.setFilterPredicate(conf,
pushed))
- val cometReader = new BatchReader(
- conf,
- file,
- footer,
- batchSize,
- readDataSchema,
- isCaseSensitive,
- useFieldId,
- ignoreMissingIds,
- datetimeRebaseSpec.mode == CORRECTED,
- partitionSchema,
- file.partitionValues,
- metrics.asJava)
- val taskContext = Option(TaskContext.get)
- taskContext.foreach(_.addTaskCompletionListener[Unit](_ =>
cometReader.close()))
- return cometReader
- } catch {
- case e: Throwable if preFetchEnabled =>
- // Keep original exception
- cometReaderExceptionMap.put(file, e)
- }
- null
- }
-
- override def buildColumnarReader(file: PartitionedFile):
PartitionReader[ColumnarBatch] = {
- val cometReader = if (!preFetchEnabled) {
- // Prefetch is not enabled, create comet reader and initiate it.
- val cometReader = buildCometReader(file)
- cometReader.init()
-
- cometReader
- } else {
- // If prefetch is enabled, we already tried to access the file when in
`buildCometReader`.
- // It is possibly we got an exception like `FileNotFoundException` and
we need to throw it
- // now to let Spark handle it.
- val reader = cometReaders.next()
- val exception = cometReaderExceptionMap.get(file)
- exception.foreach(e => throw e)
-
- if (reader == null) {
- throw new CometRuntimeException(s"Cannot find comet file reader for
$file")
- }
- reader
- }
- CometPartitionReader(cometReader)
- }
-
- def getFilter(file: PartitionedFile): (RebaseSpec, ParquetMetadata,
Option[FilterPredicate]) = {
- val sharedConf = broadcastedConf.value.value
- val footer = FooterReader.readFooter(sharedConf, file)
- val footerFileMetaData = footer.getFileMetaData
- val datetimeRebaseSpec = CometParquetFileFormat.getDatetimeRebaseSpec(
- file,
- readDataSchema,
- sharedConf,
- footerFileMetaData,
- datetimeRebaseModeInRead)
-
- val pushed = if (parquetFilterPushDown) {
- val parquetSchema = footerFileMetaData.getSchema
- val parquetFilters = new ParquetFilters(
- parquetSchema,
- readDataSchema,
- pushDownDate,
- pushDownTimestamp,
- pushDownDecimal,
- pushDownStringPredicate,
- pushDownInFilterThreshold,
- isCaseSensitive,
- datetimeRebaseSpec)
- filters
- // Collects all converted Parquet filter predicates. Notice that not
all predicates can be
- // converted (`ParquetFilters.createFilter` returns an `Option`).
That's why a `flatMap`
- // is used here.
- .flatMap(parquetFilters.createFilter)
- .reduceOption(FilterApi.and)
- } else {
- None
- }
- (datetimeRebaseSpec, footer, pushed)
- }
-
- override def createReader(inputPartition: InputPartition):
PartitionReader[InternalRow] =
- throw new UnsupportedOperationException("Only 'createColumnarReader' is
supported.")
-
- /**
- * A simple adapter on Comet's [[BatchReader]].
- */
- protected case class CometPartitionReader(reader: BatchReader)
- extends PartitionReader[ColumnarBatch] {
-
- override def next(): Boolean = {
- reader.nextBatch()
- }
-
- override def get(): ColumnarBatch = {
- reader.currentBatch()
- }
-
- override def close(): Unit = {
- reader.close()
- }
- }
-}
diff --git
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala
deleted file mode 100644
index 3f5025576..000000000
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.parquet
-
-import scala.jdk.CollectionConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.comet.CometMetricNode
-import org.apache.spark.sql.connector.read.PartitionReaderFactory
-import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
-import org.apache.spark.sql.execution.datasources.v2.FileScan
-import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.spark.util.SerializableConfiguration
-
-import org.apache.comet.MetricsSupport
-
-// TODO: Consider creating a case class and patch SQL tests if needed, will
make life easier.
-// currently hacking around this by setting the metrics within the object's
apply method.
-trait CometParquetScan extends FileScan with MetricsSupport {
- def sparkSession: SparkSession
- def hadoopConf: Configuration
- def readDataSchema: StructType
- def readPartitionSchema: StructType
- def pushedFilters: Array[Filter]
- def options: CaseInsensitiveStringMap
-
- override def equals(obj: Any): Boolean = obj match {
- case other: CometParquetScan =>
- super.equals(other) && readDataSchema == other.readDataSchema &&
- readPartitionSchema == other.readPartitionSchema &&
- equivalentFilters(pushedFilters, other.pushedFilters)
- case _ => false
- }
-
- override def hashCode(): Int = getClass.hashCode()
-
- override def createReaderFactory(): PartitionReaderFactory = {
- val sqlConf = sparkSession.sessionState.conf
- CometParquetFileFormat.populateConf(sqlConf, hadoopConf)
- val broadcastedConf =
- sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
- CometParquetPartitionReaderFactory(
- usingDataFusionReader = false, // this value is not used since this is
v2 scan
- sqlConf,
- broadcastedConf,
- readDataSchema,
- readPartitionSchema,
- pushedFilters,
- new ParquetOptions(options.asScala.toMap, sqlConf),
- metrics)
- }
-}
-
-object CometParquetScan {
- def apply(session: SparkSession, scan: ParquetScan): CometParquetScan = {
- val newScan = new ParquetScan(
- scan.sparkSession,
- scan.hadoopConf,
- scan.fileIndex,
- scan.dataSchema,
- scan.readDataSchema,
- scan.readPartitionSchema,
- scan.pushedFilters,
- scan.options,
- partitionFilters = scan.partitionFilters,
- dataFilters = scan.dataFilters) with CometParquetScan
-
- newScan.metrics = CometMetricNode.nativeScanMetrics(session.sparkContext)
++ CometMetricNode
- .parquetScanMetrics(session.sparkContext)
-
- newScan
- }
-}
diff --git
a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
index ec3336352..ce57624b7 100644
---
a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
+++
b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
@@ -22,14 +22,13 @@ package org.apache.comet.rules
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.sideBySide
-import org.apache.spark.sql.comet.{CometBatchScanExec, CometCollectLimitExec,
CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec,
CometPlan, CometScanExec, CometSparkToColumnarExec}
+import org.apache.spark.sql.comet.{CometCollectLimitExec,
CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec,
CometPlan, CometScanExec, CometSparkToColumnarExec}
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle,
CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec,
SparkPlan}
import org.apache.spark.sql.execution.adaptive.QueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.comet.CometConf
-import org.apache.comet.parquet.CometParquetScan
// This rule is responsible for eliminating redundant transitions between
row-based and
// columnar-based operators for Comet. Currently, three potential redundant
transitions are:
@@ -157,7 +156,6 @@ case class EliminateRedundantTransitions(session:
SparkSession) extends Rule[Spa
* This includes:
* - CometScanExec with native_iceberg_compat and partition columns - uses
* ConstantColumnReader
- * - CometBatchScanExec with CometParquetScan (V2 Parquet path) - uses
BatchReader
*/
private def hasScanUsingMutableBuffers(op: SparkPlan): Boolean = {
op match {
@@ -168,7 +166,6 @@ case class EliminateRedundantTransitions(session:
SparkSession) extends Rule[Spa
case scan: CometScanExec =>
scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT &&
scan.relation.partitionSchema.nonEmpty
- case scan: CometBatchScanExec =>
scan.scan.isInstanceOf[CometParquetScan]
case _ => false
}
}
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
index 3f2748c3e..4e68a423a 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
@@ -38,7 +38,6 @@ import org.apache.spark.util.collection._
import com.google.common.base.Objects
-import org.apache.comet.CometConf
import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetUtils}
import org.apache.comet.serde.OperatorOuterClass.Operator
@@ -247,8 +246,7 @@ object CometNativeScanExec {
// https://github.com/apache/arrow-datafusion-comet/issues/190
def transform(arg: Any): AnyRef = arg match {
case _: HadoopFsRelation =>
- scanExec.relation.copy(fileFormat =
- new CometParquetFileFormat(session,
CometConf.SCAN_NATIVE_DATAFUSION))(session)
+ scanExec.relation.copy(fileFormat = new
CometParquetFileFormat(session))(session)
case other: AnyRef => other
case null => null
}
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
index e283f6b2c..2707f0c04 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
@@ -37,15 +37,13 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
ParquetOptions}
-import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD
import org.apache.spark.sql.execution.metric._
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.collection._
import org.apache.comet.{CometConf, MetricsSupport}
-import org.apache.comet.parquet.{CometParquetFileFormat,
CometParquetPartitionReaderFactory}
+import org.apache.comet.parquet.CometParquetFileFormat
/**
* Comet physical scan node for DataSource V1. Most of the code here follow
Spark's
@@ -476,43 +474,13 @@ case class CometScanExec(
fsRelation: HadoopFsRelation,
readFile: (PartitionedFile) => Iterator[InternalRow],
partitions: Seq[FilePartition]): RDD[InternalRow] = {
- val hadoopConf =
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)
- val usingDataFusionReader: Boolean = scanImpl ==
CometConf.SCAN_NATIVE_ICEBERG_COMPAT
-
- val prefetchEnabled = hadoopConf.getBoolean(
- CometConf.COMET_SCAN_PREFETCH_ENABLED.key,
- CometConf.COMET_SCAN_PREFETCH_ENABLED.defaultValue.get) &&
- !usingDataFusionReader
-
val sqlConf = fsRelation.sparkSession.sessionState.conf
- if (prefetchEnabled) {
- CometParquetFileFormat.populateConf(sqlConf, hadoopConf)
- val broadcastedConf =
- fsRelation.sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
- val partitionReaderFactory = CometParquetPartitionReaderFactory(
- scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT,
- sqlConf,
- broadcastedConf,
- requiredSchema,
- relation.partitionSchema,
- pushedDownFilters.toArray,
- new ParquetOptions(CaseInsensitiveMap(relation.options), sqlConf),
- metrics)
-
- new DataSourceRDD(
- fsRelation.sparkSession.sparkContext,
- partitions.map(Seq(_)),
- partitionReaderFactory,
- true,
- Map.empty)
- } else {
- newFileScanRDD(
- fsRelation,
- readFile,
- partitions,
- new StructType(requiredSchema.fields ++
fsRelation.partitionSchema.fields),
- new ParquetOptions(CaseInsensitiveMap(relation.options), sqlConf))
- }
+ newFileScanRDD(
+ fsRelation,
+ readFile,
+ partitions,
+ new StructType(requiredSchema.fields ++
fsRelation.partitionSchema.fields),
+ new ParquetOptions(CaseInsensitiveMap(relation.options), sqlConf))
}
override def doCanonicalize(): CometScanExec = {
@@ -556,8 +524,7 @@ object CometScanExec {
// https://github.com/apache/arrow-datafusion-comet/issues/190
def transform(arg: Any): AnyRef = arg match {
case _: HadoopFsRelation =>
- scanExec.relation.copy(fileFormat = new
CometParquetFileFormat(session, scanImpl))(
- session)
+ scanExec.relation.copy(fileFormat = new
CometParquetFileFormat(session))(session)
case other: AnyRef => other
case null => null
}
diff --git
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index 928e66b29..1495eb34e 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -19,7 +19,7 @@
package org.apache.comet.parquet
-import java.io.{File, FileFilter}
+import java.io.File
import java.math.{BigDecimal, BigInteger}
import java.time.{ZoneId, ZoneOffset}
@@ -31,20 +31,17 @@ import scala.util.control.Breaks.breakable
import org.scalactic.source.Position
import org.scalatest.Tag
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.example.data.simple.SimpleGroup
import org.apache.parquet.schema.MessageTypeParser
import org.apache.spark.SparkException
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec,
CometScanExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
import com.google.common.primitives.UnsignedLong
@@ -703,76 +700,6 @@ abstract class ParquetReadSuite extends CometTestBase {
}
}
- test("partition column types") {
- withTempPath { dir =>
- Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath)
-
- val dataTypes =
- Seq(
- StringType,
- BooleanType,
- ByteType,
- BinaryType,
- ShortType,
- IntegerType,
- LongType,
- FloatType,
- DoubleType,
- DecimalType(25, 5),
- DateType,
- TimestampType)
-
- // TODO: support `NullType` here, after we add the support in
`ColumnarBatchRow`
- val constantValues =
- Seq(
- UTF8String.fromString("a string"),
- true,
- 1.toByte,
- "Spark SQL".getBytes,
- 2.toShort,
- 3,
- Long.MaxValue,
- 0.25.toFloat,
- 0.75d,
- Decimal("1234.23456"),
- DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
-
DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01
23:50:59.123")))
-
- dataTypes.zip(constantValues).foreach { case (dt, v) =>
- val schema = StructType(StructField("pcol", dt) :: Nil)
- val conf = SQLConf.get
- val partitionValues = new GenericInternalRow(Array(v))
- val file = dir
- .listFiles(new FileFilter {
- override def accept(pathname: File): Boolean =
- pathname.isFile && pathname.toString.endsWith("parquet")
- })
- .head
- val reader = new BatchReader(
- file.toString,
- CometConf.COMET_BATCH_SIZE.get(conf),
- schema,
- partitionValues)
- reader.init()
-
- try {
- reader.nextBatch()
- val batch = reader.currentBatch()
- val actual = batch.getRow(0).get(1, dt)
- val expected = v
- if (dt.isInstanceOf[BinaryType]) {
- assert(
- actual.asInstanceOf[Array[Byte]] sameElements
expected.asInstanceOf[Array[Byte]])
- } else {
- assert(actual == expected)
- }
- } finally {
- reader.close()
- }
- }
- }
- }
-
test("partition columns - multiple batch") {
withSQLConf(
CometConf.COMET_BATCH_SIZE.key -> 7.toString,
@@ -1535,116 +1462,6 @@ abstract class ParquetReadSuite extends CometTestBase {
}
}
- test("test pre-fetching multiple files") {
- def makeRawParquetFile(
- path: Path,
- dictionaryEnabled: Boolean,
- n: Int,
- pageSize: Int): Seq[Option[Int]] = {
- val schemaStr =
- """
- |message root {
- | optional boolean _1;
- | optional int32 _2(INT_8);
- | optional int32 _3(INT_16);
- | optional int32 _4;
- | optional int64 _5;
- | optional float _6;
- | optional double _7;
- | optional binary _8(UTF8);
- | optional int32 _9(UINT_8);
- | optional int32 _10(UINT_16);
- | optional int32 _11(UINT_32);
- | optional int64 _12(UINT_64);
- | optional binary _13(ENUM);
- |}
- """.stripMargin
-
- val schema = MessageTypeParser.parseMessageType(schemaStr)
- val writer = createParquetWriter(
- schema,
- path,
- dictionaryEnabled = dictionaryEnabled,
- pageSize = pageSize,
- dictionaryPageSize = pageSize)
-
- val rand = new scala.util.Random(42)
- val expected = (0 until n).map { i =>
- if (rand.nextBoolean()) {
- None
- } else {
- Some(i)
- }
- }
- expected.foreach { opt =>
- val record = new SimpleGroup(schema)
- opt match {
- case Some(i) =>
- record.add(0, i % 2 == 0)
- record.add(1, i.toByte)
- record.add(2, i.toShort)
- record.add(3, i)
- record.add(4, i.toLong)
- record.add(5, i.toFloat)
- record.add(6, i.toDouble)
- record.add(7, i.toString * 48)
- record.add(8, (-i).toByte)
- record.add(9, (-i).toShort)
- record.add(10, -i)
- record.add(11, (-i).toLong)
- record.add(12, i.toString)
- case _ =>
- }
- writer.write(record)
- }
-
- writer.close()
- expected
- }
-
- val conf = new Configuration()
- conf.set("spark.comet.scan.preFetch.enabled", "true");
- conf.set("spark.comet.scan.preFetch.threadNum", "4");
-
- withTempDir { dir =>
- val threadPool = CometPrefetchThreadPool.getOrCreateThreadPool(2)
-
- val readers = (0 to 10).map { idx =>
- val path = new Path(dir.toURI.toString, s"part-r-$idx.parquet")
- makeRawParquetFile(path, dictionaryEnabled = false, 10000, 500)
-
- val reader = new BatchReader(conf, path.toString, 1000, null, null)
- reader.submitPrefetchTask(threadPool)
-
- reader
- }
-
- // Wait for all pre-fetch tasks
- readers.foreach { reader =>
- val task = reader.getPrefetchTask()
- task.get()
- }
-
- val totolRows = readers.map { reader =>
- val queue = reader.getPrefetchQueue()
- var rowCount = 0L
-
- while (!queue.isEmpty) {
- val rowGroup = queue.take().getLeft
- rowCount += rowGroup.getRowCount
- }
-
- reader.close()
-
- rowCount
- }.sum
-
- readParquetFile(dir.toString) { df =>
- assert(df.count() == totolRows)
- }
- }
- }
-
test("test merge scan range") {
def makeRawParquetFile(path: Path, n: Int): Seq[Option[Int]] = {
val dictionaryPageSize = 1024
@@ -1753,23 +1570,6 @@ abstract class ParquetReadSuite extends CometTestBase {
}
}
- override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)(implicit
- pos: Position): Unit = {
- Seq(true, false).foreach { prefetch =>
- val cometTestName = if (prefetch) {
- testName + " (prefetch enabled)"
- } else {
- testName
- }
-
- super.test(cometTestName, testTags: _*) {
- withSQLConf(CometConf.COMET_SCAN_PREFETCH_ENABLED.key ->
prefetch.toString) {
- testFun
- }
- }
- }
- }
-
private def withId(id: Int) =
new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY,
id).build()
@@ -2036,11 +1836,7 @@ class ParquetReadV2Suite extends ParquetReadSuite with
AdaptiveSparkPlanHelper {
val scans = collect(r.filter(f).queryExecution.executedPlan) { case p:
CometBatchScanExec =>
p.scan
}
- if (CometConf.COMET_ENABLED.get()) {
- assert(scans.nonEmpty &&
scans.forall(_.isInstanceOf[CometParquetScan]))
- } else {
- assert(!scans.exists(_.isInstanceOf[CometParquetScan]))
- }
+ assert(scans.isEmpty)
}
}
diff --git
a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala
b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala
index a349ab2b9..18dec6817 100644
--- a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.comet.CometConf
-import org.apache.comet.parquet.CometParquetScan
import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator}
/**
@@ -127,11 +126,6 @@ class CometScanRuleSuite extends CometTestBase {
if (cometEnabled) {
assert(countOperators(transformedPlan, classOf[BatchScanExec])
== 0)
assert(countOperators(transformedPlan,
classOf[CometBatchScanExec]) == 1)
-
- // CometScanRule should have replaced the underlying scan
- val scan = transformedPlan.collect { case scan:
CometBatchScanExec => scan }.head
- assert(scan.wrapped.scan.isInstanceOf[CometParquetScan])
-
} else {
assert(countOperators(transformedPlan, classOf[BatchScanExec])
== 1)
assert(countOperators(transformedPlan,
classOf[CometBatchScanExec]) == 0)
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
index a2f196a4f..5b0371b27 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
@@ -38,7 +38,6 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnVector
import org.apache.comet.{CometConf, WithHdfsCluster}
-import org.apache.comet.parquet.BatchReader
/**
* Benchmark to measure Comet read performance. To run this benchmark:
@@ -179,29 +178,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
}
}
- sqlBenchmark.addCase("ParquetReader Comet") { _ =>
- files.map(_.asInstanceOf[String]).foreach { p =>
- val reader = new BatchReader(p, vectorizedReaderBatchSize)
- reader.init()
- try {
- var totalNumRows = 0
- while (reader.nextBatch()) {
- val batch = reader.currentBatch()
- val column = batch.column(0)
- val numRows = batch.numRows()
- var i = 0
- while (i < numRows) {
- if (!column.isNullAt(i)) aggregateValue(column, i)
- i += 1
- }
- totalNumRows += batch.numRows()
- }
- } finally {
- reader.close()
- }
- }
- }
-
sqlBenchmark.run()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]