This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch comet-parquet-exec
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/comet-parquet-exec by this
push:
new ab093376 [comet-parquet-exec] Comet parquet exec 2 (copy of Parth's
PR) (#1138)
ab093376 is described below
commit ab09337699876c839f841c3ed4545279130d1522
Author: Andy Grove <[email protected]>
AuthorDate: Wed Dec 4 11:51:45 2024 -0700
[comet-parquet-exec] Comet parquet exec 2 (copy of Parth's PR) (#1138)
* WIP: (POC2) A Parquet reader that uses the arrow-rs Parquet reader
directly
* Change default config
---------
Co-authored-by: Parth Chandra <[email protected]>
---
.../main/java/org/apache/comet/parquet/Native.java | 52 +++
.../apache/comet/parquet/NativeBatchReader.java | 507 +++++++++++++++++++++
.../apache/comet/parquet/NativeColumnReader.java | 190 ++++++++
.../main/scala/org/apache/comet/CometConf.scala | 12 +-
native/core/src/parquet/mod.rs | 220 ++++++++-
.../comet/parquet/CometParquetFileFormat.scala | 51 ++-
.../scala/org/apache/spark/sql/CometTestBase.scala | 3 +
7 files changed, 1018 insertions(+), 17 deletions(-)
diff --git a/common/src/main/java/org/apache/comet/parquet/Native.java
b/common/src/main/java/org/apache/comet/parquet/Native.java
index 1e666652..1ed01d32 100644
--- a/common/src/main/java/org/apache/comet/parquet/Native.java
+++ b/common/src/main/java/org/apache/comet/parquet/Native.java
@@ -234,4 +234,56 @@ public final class Native extends NativeBase {
* @param handle the handle to the native Parquet column reader
*/
public static native void closeColumnReader(long handle);
+
+ ///////////// Arrow Native Parquet Reader APIs
+ // TODO: Add partitionValues(?), improve requiredColumns to use a projection
mask that corresponds
+ // to arrow.
+ // Add batch size, datetimeRebaseModeSpec, metrics(how?)...
+
+ /**
+ * Initialize a record batch reader for a PartitionedFile
+ *
+ * @param filePath
+ * @param start
+ * @param length
+ * @param required_columns array of names of fields to read
+ * @return a handle to the record batch reader, used in subsequent calls.
+ */
+ public static native long initRecordBatchReader(
+ String filePath, long start, long length, Object[] required_columns);
+
+ public static native int numRowGroups(long handle);
+
+ public static native long numTotalRows(long handle);
+
+ // arrow native version of read batch
+ /**
+ * Read the next batch of data into memory on native side
+ *
+ * @param handle
+ * @return the number of rows read
+ */
+ public static native int readNextRecordBatch(long handle);
+
+ // arrow native equivalent of currentBatch. 'columnNum' is number of the
column in the record
+ // batch
+ /**
+ * Load the column corresponding to columnNum in the currently loaded record
batch into JVM
+ *
+ * @param handle
+ * @param columnNum
+ * @param arrayAddr
+ * @param schemaAddr
+ */
+ public static native void currentColumnBatch(
+ long handle, int columnNum, long arrayAddr, long schemaAddr);
+
+ // arrow native version to close record batch reader
+
+ /**
+ * Close the record batch reader. Free the resources
+ *
+ * @param handle
+ */
+ public static native void closeRecordBatchReader(long handle);
}
diff --git
a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
new file mode 100644
index 00000000..17fab47e
--- /dev/null
+++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.parquet;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URISyntaxException;
+import java.util.*;
+
+import scala.Option;
+import scala.collection.Seq;
+import scala.collection.mutable.Buffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.arrow.c.CometSchemaImporter;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.comet.parquet.CometParquetReadSupport;
+import org.apache.spark.sql.execution.datasources.PartitionedFile;
+import
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
+import org.apache.spark.sql.execution.metric.SQLMetric;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.util.AccumulatorV2;
+
+import org.apache.comet.CometConf;
+import org.apache.comet.shims.ShimBatchReader;
+import org.apache.comet.shims.ShimFileFormat;
+import org.apache.comet.vector.CometVector;
+
+/**
+ * A vectorized Parquet reader that reads a Parquet file in a batched fashion.
+ *
+ * <p>Example of how to use this:
+ *
+ * <pre>
+ * BatchReader reader = new BatchReader(parquetFile, batchSize);
+ * try {
+ * reader.init();
+ * while (reader.readBatch()) {
+ * ColumnarBatch batch = reader.currentBatch();
+ * // consume the batch
+ * }
+ * } finally { // resources associated with the reader should be released
+ * reader.close();
+ * }
+ * </pre>
+ */
+public class NativeBatchReader extends RecordReader<Void, ColumnarBatch>
implements Closeable {
+ private static final Logger LOG =
LoggerFactory.getLogger(NativeBatchReader.class);
+ protected static final BufferAllocator ALLOCATOR = new RootAllocator();
+
+ private Configuration conf;
+ private int capacity;
+ private boolean isCaseSensitive;
+ private boolean useFieldId;
+ private boolean ignoreMissingIds;
+ private StructType partitionSchema;
+ private InternalRow partitionValues;
+ private PartitionedFile file;
+ private final Map<String, SQLMetric> metrics;
+
+ private long rowsRead;
+ private StructType sparkSchema;
+ private MessageType requestedSchema;
+ private CometVector[] vectors;
+ private AbstractColumnReader[] columnReaders;
+ private CometSchemaImporter importer;
+ private ColumnarBatch currentBatch;
+ // private FileReader fileReader;
+ private boolean[] missingColumns;
+ private boolean isInitialized;
+ private ParquetMetadata footer;
+
+ /** The total number of rows across all row groups of the input split. */
+ private long totalRowCount;
+
+ /**
+ * Whether the native scan should always return decimal represented by 128
bits, regardless of its
+ * precision. Normally, this should be true if native execution is enabled,
since Arrow compute
+ * kernels doesn't support 32 and 64 bit decimals yet.
+ */
+ // TODO: (ARROW NATIVE)
+ private boolean useDecimal128;
+
+ /**
+ * Whether to return dates/timestamps that were written with legacy hybrid
(Julian + Gregorian)
+ * calendar as it is. If this is true, Comet will return them as it is,
instead of rebasing them
+ * to the new Proleptic Gregorian calendar. If this is false, Comet will
throw exceptions when
+ * seeing these dates/timestamps.
+ */
+ // TODO: (ARROW NATIVE)
+ private boolean useLegacyDateTimestamp;
+
+ /** The TaskContext object for executing this task. */
+ private final TaskContext taskContext;
+
+ private long handle;
+
+ // Only for testing
+ public NativeBatchReader(String file, int capacity) {
+ this(file, capacity, null, null);
+ }
+
+ // Only for testing
+ public NativeBatchReader(
+ String file, int capacity, StructType partitionSchema, InternalRow
partitionValues) {
+ this(new Configuration(), file, capacity, partitionSchema,
partitionValues);
+ }
+
+ // Only for testing
+ public NativeBatchReader(
+ Configuration conf,
+ String file,
+ int capacity,
+ StructType partitionSchema,
+ InternalRow partitionValues) {
+
+ this.conf = conf;
+ this.capacity = capacity;
+ this.isCaseSensitive = false;
+ this.useFieldId = false;
+ this.ignoreMissingIds = false;
+ this.partitionSchema = partitionSchema;
+ this.partitionValues = partitionValues;
+
+ this.file = ShimBatchReader.newPartitionedFile(partitionValues, file);
+ this.metrics = new HashMap<>();
+
+ this.taskContext = TaskContext$.MODULE$.get();
+ }
+
+ public NativeBatchReader(AbstractColumnReader[] columnReaders) {
+ // Todo: set useDecimal128 and useLazyMaterialization
+ int numColumns = columnReaders.length;
+ this.columnReaders = new AbstractColumnReader[numColumns];
+ vectors = new CometVector[numColumns];
+ currentBatch = new ColumnarBatch(vectors);
+ // This constructor is used by Iceberg only. The columnReaders are
+ // initialized in Iceberg, so no need to call the init()
+ isInitialized = true;
+ this.taskContext = TaskContext$.MODULE$.get();
+ this.metrics = new HashMap<>();
+ }
+
+ NativeBatchReader(
+ Configuration conf,
+ PartitionedFile inputSplit,
+ ParquetMetadata footer,
+ int capacity,
+ StructType sparkSchema,
+ boolean isCaseSensitive,
+ boolean useFieldId,
+ boolean ignoreMissingIds,
+ boolean useLegacyDateTimestamp,
+ StructType partitionSchema,
+ InternalRow partitionValues,
+ Map<String, SQLMetric> metrics) {
+ this.conf = conf;
+ this.capacity = capacity;
+ this.sparkSchema = sparkSchema;
+ this.isCaseSensitive = isCaseSensitive;
+ this.useFieldId = useFieldId;
+ this.ignoreMissingIds = ignoreMissingIds;
+ this.useLegacyDateTimestamp = useLegacyDateTimestamp;
+ this.partitionSchema = partitionSchema;
+ this.partitionValues = partitionValues;
+ this.file = inputSplit;
+ this.footer = footer;
+ this.metrics = metrics;
+ this.taskContext = TaskContext$.MODULE$.get();
+ }
+
+ /**
+ * Initialize this reader. The reason we don't do it in the constructor is
that we want to close
+ * any resource hold by this reader when error happens during the
initialization.
+ */
+ public void init() throws URISyntaxException, IOException {
+
+ useDecimal128 =
+ conf.getBoolean(
+ CometConf.COMET_USE_DECIMAL_128().key(),
+ (Boolean) CometConf.COMET_USE_DECIMAL_128().defaultValue().get());
+
+ long start = file.start();
+ long length = file.length();
+ String filePath = file.filePath().toString();
+
+ requestedSchema = footer.getFileMetaData().getSchema();
+ MessageType fileSchema = requestedSchema;
+ // TODO: (ARROW NATIVE) Get requested schema - Convert the Spark schema
(from catalyst) into a
+ // list of fields to project (?). Fields must be matched by field id first
and then by name
+ { //////// Get requested Schema - replace this block of code native
(avoid reading the footer
+ ParquetReadOptions.Builder builder = HadoopReadOptions.builder(conf, new
Path(filePath));
+
+ if (start >= 0 && length >= 0) {
+ builder = builder.withRange(start, start + length);
+ }
+ ParquetReadOptions readOptions = builder.build();
+
+ ReadOptions cometReadOptions = ReadOptions.builder(conf).build();
+
+ if (sparkSchema == null) {
+ sparkSchema = new
ParquetToSparkSchemaConverter(conf).convert(requestedSchema);
+ } else {
+ requestedSchema =
+ CometParquetReadSupport.clipParquetSchema(
+ requestedSchema, sparkSchema, isCaseSensitive, useFieldId,
ignoreMissingIds);
+ if (requestedSchema.getColumns().size() != sparkSchema.size()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Spark schema has %d columns while " + "Parquet schema has
%d columns",
+ sparkSchema.size(), requestedSchema.getColumns().size()));
+ }
+ }
+ } ////// End get requested schema
+
+ //// Create Column readers
+ List<ColumnDescriptor> columns = requestedSchema.getColumns();
+ int numColumns = columns.size();
+ if (partitionSchema != null) numColumns += partitionSchema.size();
+ columnReaders = new AbstractColumnReader[numColumns];
+
+ // Initialize missing columns and use null vectors for them
+ missingColumns = new boolean[columns.size()];
+ List<String[]> paths = requestedSchema.getPaths();
+ StructField[] nonPartitionFields = sparkSchema.fields();
+ // ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema);
+ for (int i = 0; i < requestedSchema.getFieldCount(); i++) {
+ Type t = requestedSchema.getFields().get(i);
+ Preconditions.checkState(
+ t.isPrimitive() && !t.isRepetition(Type.Repetition.REPEATED),
+ "Complex type is not supported");
+ String[] colPath = paths.get(i);
+ if
(nonPartitionFields[i].name().equals(ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME()))
{
+ // Values of ROW_INDEX_TEMPORARY_COLUMN_NAME column are always
populated with
+ // generated row indexes, rather than read from the file.
+ // TODO(SPARK-40059): Allow users to include columns named
+ // FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME in
their schemas.
+ // TODO: (ARROW NATIVE) Support row indices ...
+ // long[] rowIndices = fileReader.getRowIndices();
+ // columnReaders[i] = new
RowIndexColumnReader(nonPartitionFields[i], capacity,
+ // rowIndices);
+ missingColumns[i] = true;
+ } else if (fileSchema.containsPath(colPath)) {
+ ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
+ if (!fd.equals(columns.get(i))) {
+ throw new UnsupportedOperationException("Schema evolution is not
supported");
+ }
+ missingColumns[i] = false;
+ } else {
+ if (columns.get(i).getMaxDefinitionLevel() == 0) {
+ throw new IOException(
+ "Required column '"
+ + Arrays.toString(colPath)
+ + "' is missing"
+ + " in data file "
+ + filePath);
+ }
+ ConstantColumnReader reader =
+ new ConstantColumnReader(nonPartitionFields[i], capacity,
useDecimal128);
+ columnReaders[i] = reader;
+ missingColumns[i] = true;
+ }
+ }
+
+ // Initialize constant readers for partition columns
+ if (partitionSchema != null) {
+ StructField[] partitionFields = partitionSchema.fields();
+ for (int i = columns.size(); i < columnReaders.length; i++) {
+ int fieldIndex = i - columns.size();
+ StructField field = partitionFields[fieldIndex];
+ ConstantColumnReader reader =
+ new ConstantColumnReader(field, capacity, partitionValues,
fieldIndex, useDecimal128);
+ columnReaders[i] = reader;
+ }
+ }
+
+ vectors = new CometVector[numColumns];
+ currentBatch = new ColumnarBatch(vectors);
+
+ // For test purpose only
+ // If the last external accumulator is `NumRowGroupsAccumulator`, the row
group number to read
+ // will be updated to the accumulator. So we can check if the row groups
are filtered or not
+ // in test case.
+ // Note that this tries to get thread local TaskContext object, if this is
called at other
+ // thread, it won't update the accumulator.
+ if (taskContext != null) {
+ Option<AccumulatorV2<?, ?>> accu =
getTaskAccumulator(taskContext.taskMetrics());
+ if (accu.isDefined() &&
accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) {
+ @SuppressWarnings("unchecked")
+ AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer,
Integer>) accu.get();
+ // TODO: Get num_row_groups from native
+ // intAccum.add(fileReader.getRowGroups().size());
+ }
+ }
+
+ // TODO: (ARROW NATIVE) Use a ProjectionMask here ?
+ ArrayList<String> requiredColumns = new ArrayList<>();
+ for (Type col : requestedSchema.asGroupType().getFields()) {
+ requiredColumns.add(col.getName());
+ }
+ this.handle = Native.initRecordBatchReader(filePath, start, length,
requiredColumns.toArray());
+ totalRowCount = Native.numRowGroups(handle);
+ isInitialized = true;
+ }
+
+ public void setSparkSchema(StructType schema) {
+ this.sparkSchema = schema;
+ }
+
+ public AbstractColumnReader[] getColumnReaders() {
+ return columnReaders;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext
taskAttemptContext)
+ throws IOException, InterruptedException {
+ // Do nothing. The initialization work is done in 'init' already.
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ return nextBatch();
+ }
+
+ @Override
+ public Void getCurrentKey() {
+ return null;
+ }
+
+ @Override
+ public ColumnarBatch getCurrentValue() {
+ return currentBatch();
+ }
+
+ @Override
+ public float getProgress() {
+ return (float) rowsRead / totalRowCount;
+ }
+
+ /**
+ * Returns the current columnar batch being read.
+ *
+ * <p>Note that this must be called AFTER {@link
NativeBatchReader#nextBatch()}.
+ */
+ public ColumnarBatch currentBatch() {
+ return currentBatch;
+ }
+
+ /**
+ * Loads the next batch of rows. This is called by Spark _and_ Iceberg
+ *
+ * @return true if there are no more rows to read, false otherwise.
+ */
+ public boolean nextBatch() throws IOException {
+ Preconditions.checkState(isInitialized, "init() should be called first!");
+
+ if (rowsRead >= totalRowCount) return false;
+ int batchSize;
+
+ try {
+ batchSize = loadNextBatch();
+ } catch (RuntimeException e) {
+ // Spark will check certain exception e.g.
`SchemaColumnConvertNotSupportedException`.
+ throw e;
+ } catch (Throwable e) {
+ throw new IOException(e);
+ }
+
+ if (batchSize == 0) return false;
+
+ long totalDecodeTime = 0, totalLoadTime = 0;
+ for (int i = 0; i < columnReaders.length; i++) {
+ AbstractColumnReader reader = columnReaders[i];
+ long startNs = System.nanoTime();
+ // TODO: read from native reader
+ reader.readBatch(batchSize);
+ // totalDecodeTime += System.nanoTime() - startNs;
+ // startNs = System.nanoTime();
+ vectors[i] = reader.currentBatch();
+ totalLoadTime += System.nanoTime() - startNs;
+ }
+
+ // TODO: (ARROW NATIVE) Add Metrics
+ // SQLMetric decodeMetric = metrics.get("ParquetNativeDecodeTime");
+ // if (decodeMetric != null) {
+ // decodeMetric.add(totalDecodeTime);
+ // }
+ SQLMetric loadMetric = metrics.get("ParquetNativeLoadTime");
+ if (loadMetric != null) {
+ loadMetric.add(totalLoadTime);
+ }
+
+ currentBatch.setNumRows(batchSize);
+ rowsRead += batchSize;
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (columnReaders != null) {
+ for (AbstractColumnReader reader : columnReaders) {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+ if (importer != null) {
+ importer.close();
+ importer = null;
+ }
+ Native.closeRecordBatchReader(this.handle);
+ }
+
+ @SuppressWarnings("deprecation")
+ private int loadNextBatch() throws Throwable {
+ long startNs = System.nanoTime();
+
+ int batchSize = Native.readNextRecordBatch(this.handle);
+ if (importer != null) importer.close();
+ importer = new CometSchemaImporter(ALLOCATOR);
+
+ List<ColumnDescriptor> columns = requestedSchema.getColumns();
+ for (int i = 0; i < columns.size(); i++) {
+ // TODO: (ARROW NATIVE) check this. Currently not handling missing
columns correctly?
+ if (missingColumns[i]) continue;
+ if (columnReaders[i] != null) columnReaders[i].close();
+ // TODO: (ARROW NATIVE) handle tz, datetime & int96 rebase
+ DataType dataType = sparkSchema.fields()[i].dataType();
+ NativeColumnReader reader =
+ new NativeColumnReader(
+ this.handle,
+ i,
+ dataType,
+ columns.get(i),
+ importer,
+ capacity,
+ useDecimal128,
+ useLegacyDateTimestamp);
+ columnReaders[i] = reader;
+ }
+ return batchSize;
+ }
+
+ // Signature of externalAccums changed from returning a Buffer to returning
a Seq. If comet is
+ // expecting a Buffer but the Spark version returns a Seq or vice versa, we
get a
+ // method not found exception.
+ @SuppressWarnings("unchecked")
+ private Option<AccumulatorV2<?, ?>> getTaskAccumulator(TaskMetrics
taskMetrics) {
+ Method externalAccumsMethod;
+ try {
+ externalAccumsMethod =
TaskMetrics.class.getDeclaredMethod("externalAccums");
+ externalAccumsMethod.setAccessible(true);
+ String returnType = externalAccumsMethod.getReturnType().getName();
+ if (returnType.equals("scala.collection.mutable.Buffer")) {
+ return ((Buffer<AccumulatorV2<?, ?>>)
externalAccumsMethod.invoke(taskMetrics))
+ .lastOption();
+ } else if (returnType.equals("scala.collection.Seq")) {
+ return ((Seq<AccumulatorV2<?, ?>>)
externalAccumsMethod.invoke(taskMetrics)).lastOption();
+ } else {
+ return Option.apply(null); // None
+ }
+ } catch (NoSuchMethodException | InvocationTargetException |
IllegalAccessException e) {
+ return Option.apply(null); // None
+ }
+ }
+}
diff --git
a/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java
b/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java
new file mode 100644
index 00000000..448ba0fe
--- /dev/null
+++ b/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.parquet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.c.CometSchemaImporter;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.dictionary.Dictionary;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.*;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.spark.sql.types.DataType;
+
+import org.apache.comet.vector.*;
+
+// TODO: extend ColumnReader instead of AbstractColumnReader to reduce code
duplication
+public class NativeColumnReader extends AbstractColumnReader {
+ protected static final Logger LOG =
LoggerFactory.getLogger(NativeColumnReader.class);
+ protected final BufferAllocator ALLOCATOR = new RootAllocator();
+
+ /**
+ * The current Comet vector holding all the values read by this column
reader. Owned by this
+ * reader and MUST be closed after use.
+ */
+ private CometDecodedVector currentVector;
+
+ /** Dictionary values for this column. Only set if the column is using
dictionary encoding. */
+ protected CometDictionary dictionary;
+
+ /**
+ * The number of values in the current batch, used when we are skipping
importing of Arrow
+ * vectors, in which case we'll simply update the null count of the existing
vectors.
+ */
+ int currentNumValues;
+
+ /**
+ * Whether the last loaded vector contains any null value. This is used to
determine if we can
+ * skip vector reloading. If the flag is false, Arrow C API will skip to
import the validity
+ * buffer, and therefore we cannot skip vector reloading.
+ */
+ boolean hadNull;
+
+ private final CometSchemaImporter importer;
+
+ private ArrowArray array = null;
+ private ArrowSchema schema = null;
+
+ private long nativeBatchHandle = 0xDEADBEEFL;
+ private final int columnNum;
+
+ public NativeColumnReader(
+ long nativeBatchHandle,
+ int columnNum,
+ DataType type,
+ ColumnDescriptor descriptor,
+ CometSchemaImporter importer,
+ int batchSize,
+ boolean useDecimal128,
+ boolean useLegacyDateTimestamp) {
+ super(type, descriptor, useDecimal128, useLegacyDateTimestamp);
+ assert batchSize > 0 : "Batch size must be positive, found " + batchSize;
+ this.batchSize = batchSize;
+ this.importer = importer;
+ this.nativeBatchHandle = nativeBatchHandle;
+ this.columnNum = columnNum;
+ initNative();
+ }
+
+ @Override
+ // Override in order to avoid creation of JVM side column readers
+ protected void initNative() {
+ LOG.debug(
+ "Native column reader " + String.join(".", this.descriptor.getPath())
+ " is initialized");
+ nativeHandle = 0;
+ }
+
+ @Override
+ public void readBatch(int total) {
+ LOG.debug("Reading column batch of size = " + total);
+
+ this.currentNumValues = total;
+ }
+
+ /** Returns the {@link CometVector} read by this reader. */
+ @Override
+ public CometVector currentBatch() {
+ return loadVector();
+ }
+
+ @Override
+ public void close() {
+ if (currentVector != null) {
+ currentVector.close();
+ currentVector = null;
+ }
+ super.close();
+ }
+
+ /** Returns a decoded {@link CometDecodedVector Comet vector}. */
+ public CometDecodedVector loadVector() {
+
+ LOG.debug("Loading vector for next batch");
+
+ // Close the previous vector first to release struct memory allocated to
import Arrow array &
+ // schema from native side, through the C data interface
+ if (currentVector != null) {
+ currentVector.close();
+ }
+
+ LogicalTypeAnnotation logicalTypeAnnotation =
+ descriptor.getPrimitiveType().getLogicalTypeAnnotation();
+ boolean isUuid =
+ logicalTypeAnnotation instanceof
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;
+
+ array = ArrowArray.allocateNew(ALLOCATOR);
+ schema = ArrowSchema.allocateNew(ALLOCATOR);
+
+ long arrayAddr = array.memoryAddress();
+ long schemaAddr = schema.memoryAddress();
+
+ Native.currentColumnBatch(nativeBatchHandle, columnNum, arrayAddr,
schemaAddr);
+
+ FieldVector vector = importer.importVector(array, schema);
+
+ DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary();
+
+ CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128);
+
+ // Update whether the current vector contains any null values. This is
used in the following
+ // batch(s) to determine whether we can skip loading the native vector.
+ hadNull = cometVector.hasNull();
+
+ if (dictionaryEncoding == null) {
+ if (dictionary != null) {
+ // This means the column was using dictionary encoding but now has
fall-back to plain
+ // encoding, on the native side. Setting 'dictionary' to null here, so
we can use it as
+ // a condition to check if we can re-use vector later.
+ dictionary = null;
+ }
+ // Either the column is not dictionary encoded, or it was using
dictionary encoding but
+ // a new data page has switched back to use plain encoding. For both
cases we should
+ // return plain vector.
+ currentVector = cometVector;
+ return currentVector;
+ }
+
+ // We should already re-initiate `CometDictionary` here because
`Data.importVector` API will
+ // release the previous dictionary vector and create a new one.
+ Dictionary arrowDictionary =
importer.getProvider().lookup(dictionaryEncoding.getId());
+ CometPlainVector dictionaryVector =
+ new CometPlainVector(arrowDictionary.getVector(), useDecimal128,
isUuid);
+ if (dictionary != null) {
+ dictionary.setDictionaryVector(dictionaryVector);
+ } else {
+ dictionary = new CometDictionary(dictionaryVector);
+ }
+
+ currentVector =
+ new CometDictionaryVector(
+ cometVector, dictionary, importer.getProvider(), useDecimal128,
false, isUuid);
+
+ currentVector =
+ new CometDictionaryVector(cometVector, dictionary,
importer.getProvider(), useDecimal128);
+ return currentVector;
+ }
+}
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 09355446..275114a1 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -85,7 +85,17 @@ object CometConf extends ShimCometConf {
"read supported data sources (currently only Parquet is supported
natively)." +
" By default, this config is true.")
.booleanConf
- .createWithDefault(true)
+ .createWithDefault(false)
+
+ val COMET_NATIVE_ARROW_SCAN_ENABLED: ConfigEntry[Boolean] = conf(
+ "spark.comet.native.arrow.scan.enabled")
+ .internal()
+ .doc(
+ "Whether to enable the fully native arrow based scan. When this is
turned on, Spark will " +
+ "use Comet to read Parquet files natively via the Arrow based Parquet
reader." +
+ " By default, this config is false.")
+ .booleanConf
+ .createWithDefault(false)
val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.parquet.read.parallel.io.enabled")
diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index 455f1992..ffd34216 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -23,6 +23,7 @@ pub use mutable_vector::*;
pub mod util;
pub mod read;
+use std::fs::File;
use std::{boxed::Box, ptr::NonNull, sync::Arc};
use crate::errors::{try_unwrap_or_throw, CometError};
@@ -39,10 +40,18 @@ use jni::{
},
};
+use crate::execution::operators::ExecutionError;
use crate::execution::utils::SparkArrowConvert;
use arrow::buffer::{Buffer, MutableBuffer};
-use jni::objects::{JBooleanArray, JLongArray, JPrimitiveArray, ReleaseMode};
+use arrow_array::{Array, RecordBatch};
+use jni::objects::{
+ JBooleanArray, JLongArray, JObjectArray, JPrimitiveArray, JString,
ReleaseMode,
+};
+use jni::sys::jstring;
+use parquet::arrow::arrow_reader::{ParquetRecordBatchReader,
ParquetRecordBatchReaderBuilder};
+use parquet::arrow::ProjectionMask;
use read::ColumnReader;
+use url::Url;
use util::jni::{convert_column_descriptor, convert_encoding};
use self::util::jni::TypePromotionInfo;
@@ -582,3 +591,212 @@ fn from_u8_slice(src: &mut [u8]) -> &mut [i8] {
let raw_ptr = src.as_mut_ptr() as *mut i8;
unsafe { std::slice::from_raw_parts_mut(raw_ptr, src.len()) }
}
+
+// TODO: (ARROW NATIVE) remove this if not needed.
+enum ParquetReaderState {
+ Init,
+ Reading,
+ Complete,
+}
+/// Parquet read context maintained across multiple JNI calls.
+struct BatchContext {
+ batch_reader: ParquetRecordBatchReader,
+ current_batch: Option<RecordBatch>,
+ reader_state: ParquetReaderState,
+ num_row_groups: i32,
+ total_rows: i64,
+}
+
+#[inline]
+fn get_batch_context<'a>(handle: jlong) -> Result<&'a mut BatchContext,
CometError> {
+ unsafe {
+ (handle as *mut BatchContext)
+ .as_mut()
+ .ok_or_else(|| CometError::NullPointer("null batch context
handle".to_string()))
+ }
+}
+
+#[inline]
+fn get_batch_reader<'a>(handle: jlong) -> Result<&'a mut
ParquetRecordBatchReader, CometError> {
+ Ok(&mut get_batch_context(handle)?.batch_reader)
+}
+
+#[no_mangle]
+pub extern "system" fn
Java_org_apache_comet_parquet_Native_initRecordBatchReader(
+ e: JNIEnv,
+ _jclass: JClass,
+ file_path: jstring,
+ start: jlong,
+ length: jlong,
+ required_columns: jobjectArray,
+) -> jlong {
+ try_unwrap_or_throw(&e, |mut env| unsafe {
+ let path: String = env
+ .get_string(&JString::from_raw(file_path))
+ .unwrap()
+ .into();
+ //TODO: (ARROW NATIVE) - this works only for 'file://' urls
+ let path = Url::parse(path.as_ref()).unwrap().to_file_path().unwrap();
+ let file = File::open(path).unwrap();
+
+ // Create a async parquet reader builder with batch_size.
+ // batch_size is the number of rows to read up to buffer once from
pages, defaults to 1024
+ // TODO: (ARROW NATIVE) Use async reader
ParquetRecordBatchStreamBuilder
+ let mut builder = ParquetRecordBatchReaderBuilder::try_new(file)
+ .unwrap()
+ .with_batch_size(8192); // TODO: (ARROW NATIVE) Use batch size
configured in JVM
+
+ //TODO: (ARROW NATIVE) if we can get the ParquetMetadata serialized,
we need not do this.
+ let metadata = builder.metadata().clone();
+
+ let mut columns_to_read: Vec<usize> = Vec::new();
+ let columns_to_read_array = JObjectArray::from_raw(required_columns);
+ let array_len = env.get_array_length(&columns_to_read_array)?;
+ let mut required_columns: Vec<String> = Vec::new();
+ for i in 0..array_len {
+ let p: JString = env
+ .get_object_array_element(&columns_to_read_array, i)?
+ .into();
+ required_columns.push(env.get_string(&p)?.into());
+ }
+ for (i, col) in metadata
+ .file_metadata()
+ .schema_descr()
+ .columns()
+ .iter()
+ .enumerate()
+ {
+ for (_, required) in required_columns.iter().enumerate() {
+ if col.name().to_uppercase().eq(&required.to_uppercase()) {
+ columns_to_read.push(i);
+ break;
+ }
+ }
+ }
+ //TODO: (ARROW NATIVE) make this work for complex types (especially
deeply nested structs)
+ let mask =
ProjectionMask::leaves(metadata.file_metadata().schema_descr(),
columns_to_read);
+ // Set projection mask to read only root columns 1 and 2.
+ builder = builder.with_projection(mask);
+
+ let mut row_groups_to_read: Vec<usize> = Vec::new();
+ let mut total_rows: i64 = 0;
+ // get row groups -
+ for (i, rg) in metadata.row_groups().into_iter().enumerate() {
+ let rg_start = rg.file_offset().unwrap();
+ let rg_end = rg_start + rg.compressed_size();
+ if rg_start >= start && rg_end <= start + length {
+ row_groups_to_read.push(i);
+ total_rows += rg.num_rows();
+ }
+ }
+
+ // Build a sync parquet reader.
+ let batch_reader = builder
+ .with_row_groups(row_groups_to_read.clone())
+ .build()
+ .unwrap();
+
+ let ctx = BatchContext {
+ batch_reader,
+ current_batch: None,
+ reader_state: ParquetReaderState::Init,
+ num_row_groups: row_groups_to_read.len() as i32,
+ total_rows: total_rows,
+ };
+ let res = Box::new(ctx);
+ Ok(Box::into_raw(res) as i64)
+ })
+}
+
+#[no_mangle]
+pub extern "system" fn Java_org_apache_comet_parquet_Native_numRowGroups(
+ e: JNIEnv,
+ _jclass: JClass,
+ handle: jlong,
+) -> jint {
+ try_unwrap_or_throw(&e, |_env| {
+ let context = get_batch_context(handle)?;
+ // Read data
+ Ok(context.num_row_groups)
+ }) as jint
+}
+
+#[no_mangle]
+pub extern "system" fn Java_org_apache_comet_parquet_Native_numTotalRows(
+ e: JNIEnv,
+ _jclass: JClass,
+ handle: jlong,
+) -> jlong {
+ try_unwrap_or_throw(&e, |_env| {
+ let context = get_batch_context(handle)?;
+ // Read data
+ Ok(context.total_rows)
+ }) as jlong
+}
+
+#[no_mangle]
+pub extern "system" fn
Java_org_apache_comet_parquet_Native_readNextRecordBatch(
+ e: JNIEnv,
+ _jclass: JClass,
+ handle: jlong,
+) -> jint {
+ try_unwrap_or_throw(&e, |_env| {
+ let context = get_batch_context(handle)?;
+ let batch_reader = &mut context.batch_reader;
+ // Read data
+ let mut rows_read: i32 = 0;
+ let batch = batch_reader.next();
+
+ match batch {
+ Some(record_batch) => {
+ let batch = record_batch?;
+ rows_read = batch.num_rows() as i32;
+ context.current_batch = Some(batch);
+ context.reader_state = ParquetReaderState::Reading;
+ }
+ None => {
+ context.current_batch = None;
+ context.reader_state = ParquetReaderState::Complete;
+ }
+ }
+ Ok(rows_read)
+ })
+}
+
+#[no_mangle]
+pub extern "system" fn Java_org_apache_comet_parquet_Native_currentColumnBatch(
+ e: JNIEnv,
+ _jclass: JClass,
+ handle: jlong,
+ column_idx: jint,
+ array_addr: jlong,
+ schema_addr: jlong,
+) {
+ try_unwrap_or_throw(&e, |_env| {
+ let context = get_batch_context(handle)?;
+ let batch_reader = context
+ .current_batch
+ .as_mut()
+ .ok_or_else(|| CometError::Execution {
+ source: ExecutionError::GeneralError("There is no more data to
read".to_string()),
+ });
+ let data = batch_reader?.column(column_idx as usize).into_data();
+ data.move_to_spark(array_addr, schema_addr)
+ .map_err(|e| e.into())
+ })
+}
+
+#[no_mangle]
+pub extern "system" fn
Java_org_apache_comet_parquet_Native_closeRecordBatchReader(
+ env: JNIEnv,
+ _jclass: JClass,
+ handle: jlong,
+) {
+ try_unwrap_or_throw(&env, |_| {
+ unsafe {
+ let ctx = handle as *mut BatchContext;
+ let _ = Box::from_raw(ctx);
+ };
+ Ok(())
+ })
+}
diff --git
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
index 52d8d09a..4c96bef4 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
@@ -100,6 +100,7 @@ class CometParquetFileFormat extends ParquetFileFormat with
MetricsSupport with
// Comet specific configurations
val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)
+ val nativeArrowReaderEnabled =
CometConf.COMET_NATIVE_ARROW_SCAN_ENABLED.get(sqlConf)
(file: PartitionedFile) => {
val sharedConf = broadcastedHadoopConf.value.value
@@ -134,22 +135,42 @@ class CometParquetFileFormat extends ParquetFileFormat
with MetricsSupport with
}
pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))
- val batchReader = new BatchReader(
- sharedConf,
- file,
- footer,
- capacity,
- requiredSchema,
- isCaseSensitive,
- useFieldId,
- ignoreMissingIds,
- datetimeRebaseSpec.mode == CORRECTED,
- partitionSchema,
- file.partitionValues,
- JavaConverters.mapAsJavaMap(metrics))
- val iter = new RecordReaderIterator(batchReader)
+ val recordBatchReader =
+ if (nativeArrowReaderEnabled) {
+ val batchReader = new NativeBatchReader(
+ sharedConf,
+ file,
+ footer,
+ capacity,
+ requiredSchema,
+ isCaseSensitive,
+ useFieldId,
+ ignoreMissingIds,
+ datetimeRebaseSpec.mode == CORRECTED,
+ partitionSchema,
+ file.partitionValues,
+ JavaConverters.mapAsJavaMap(metrics))
+ batchReader.init()
+ batchReader
+ } else {
+ val batchReader = new BatchReader(
+ sharedConf,
+ file,
+ footer,
+ capacity,
+ requiredSchema,
+ isCaseSensitive,
+ useFieldId,
+ ignoreMissingIds,
+ datetimeRebaseSpec.mode == CORRECTED,
+ partitionSchema,
+ file.partitionValues,
+ JavaConverters.mapAsJavaMap(metrics))
+ batchReader.init()
+ batchReader
+ }
+ val iter = new RecordReaderIterator(recordBatchReader)
try {
- batchReader.init()
iter.asInstanceOf[Iterator[InternalRow]]
} catch {
case e: Throwable =>
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 35ba0690..99ed5d3c 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -79,6 +79,9 @@ abstract class CometTestBase
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
+ conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
+ conf.set(CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key, "true")
+ conf.set(CometConf.COMET_NATIVE_ARROW_SCAN_ENABLED.key, "false")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g")
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key,
"true")
conf
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]