martin-g commented on code in PR #2680:
URL: https://github.com/apache/datafusion-comet/pull/2680#discussion_r2508929151
##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -421,9 +515,40 @@ public void init() throws Throwable {
CometFileKeyUnwrapper keyUnwrapper = null;
if (encryptionEnabled) {
keyUnwrapper = new CometFileKeyUnwrapper();
- keyUnwrapper.storeDecryptionKeyRetriever(file.filePath().toString(),
conf);
+ keyUnwrapper.storeDecryptionKeyRetriever(filePath, conf);
}
+ // Filter out columns with preinitialized readers from sparkSchema
before making the
+ // call to native
+ if (preInitializedReaders != null) {
+ StructType filteredSchema = new StructType();
+ StructField[] sparkFields = sparkSchema.fields();
+ // List<Type> fileFields = fileSchema.getFields();
+ for (int i = 0; i < sparkFields.length; i++) {
+ // Keep the column if:
+ // 1. It doesn't have a preinitialized reader, OR
+ // 2. It has a preinitialized reader but exists in fileSchema
+ boolean hasPreInitializedReader =
+ i < preInitializedReaders.length && preInitializedReaders[i] !=
null;
+ int finalI = i;
+ boolean existsInFileSchema =
+ fileFields.stream().anyMatch(f ->
f.getName().equals(sparkFields[finalI].name()));
+
+ if (!hasPreInitializedReader || existsInFileSchema) {
+ filteredSchema = filteredSchema.add(sparkFields[i]);
+ }
+ }
+ sparkSchema = filteredSchema;
+ }
+
+ // String timeZoneId = conf.get("spark.sql.session.timeZone");
+ String timeZoneId = "UTC";
Review Comment:
Is this intentional ?
If it is then either move the comment below one line up or add a new comment
why `timeZoneId` should be also always UTC. The commented out
`conf.get("spark.sql.session.timeZone");` could be removed too.
##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -421,9 +515,40 @@ public void init() throws Throwable {
CometFileKeyUnwrapper keyUnwrapper = null;
if (encryptionEnabled) {
keyUnwrapper = new CometFileKeyUnwrapper();
- keyUnwrapper.storeDecryptionKeyRetriever(file.filePath().toString(),
conf);
+ keyUnwrapper.storeDecryptionKeyRetriever(filePath, conf);
}
+ // Filter out columns with preinitialized readers from sparkSchema
before making the
+ // call to native
+ if (preInitializedReaders != null) {
+ StructType filteredSchema = new StructType();
+ StructField[] sparkFields = sparkSchema.fields();
+ // List<Type> fileFields = fileSchema.getFields();
+ for (int i = 0; i < sparkFields.length; i++) {
+ // Keep the column if:
+ // 1. It doesn't have a preinitialized reader, OR
+ // 2. It has a preinitialized reader but exists in fileSchema
+ boolean hasPreInitializedReader =
+ i < preInitializedReaders.length && preInitializedReaders[i] !=
null;
+ int finalI = i;
+ boolean existsInFileSchema =
+ fileFields.stream().anyMatch(f ->
f.getName().equals(sparkFields[finalI].name()));
Review Comment:
Should this equality check take into account `spark.sql.caseSensitive` ?
##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -834,16 +987,34 @@ private int loadNextBatch() throws Throwable {
importer = new CometSchemaImporter(ALLOCATOR);
List<Type> fields = requestedSchema.getFields();
+ StructField[] sparkFields = sparkSchema.fields();
+
for (int i = 0; i < fields.size(); i++) {
if (!missingColumns[i]) {
if (columnReaders[i] != null) columnReaders[i].close();
// TODO: (ARROW NATIVE) handle tz, datetime & int96 rebase
- DataType dataType = sparkSchema.fields()[i].dataType();
Type field = fields.get(i);
+
+ // Find the corresponding spark field by matching field names
+ DataType dataType = null;
+ int sparkSchemaIndex = -1;
+ for (int j = 0; j < sparkFields.length; j++) {
+ if (sparkFields[j].name().equals(field.getName())) {
Review Comment:
Should this equality check take into account `spark.sql.caseSensitive` ?
If it is sensitive then it could be optimized by storing the sparkFields in
a Map<String, Field> and lookup by name here instead of looping over them for
each `field`
##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -364,23 +453,28 @@ public void init() throws Throwable {
checkColumn(parquetFields[i]);
missingColumns[i] = false;
} else {
- if (field.getRepetition() == Type.Repetition.REQUIRED) {
- throw new IOException(
- "Required column '"
- + field.getName()
- + "' is missing"
- + " in data file "
- + filePath);
- }
- if (field.isPrimitive()) {
- ConstantColumnReader reader =
- new ConstantColumnReader(nonPartitionFields[i], capacity,
useDecimal128);
- columnReaders[i] = reader;
+ if (preInitializedReaders != null && preInitializedReaders[i] !=
null) {
Review Comment:
```suggestion
if (preInitializedReaders != null && i <
preInitializedReaders.length && preInitializedReaders[i] != null) {
```
##########
common/src/main/java/org/apache/comet/parquet/IcebergCometNativeBatchReader.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.parquet;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.metric.SQLMetric;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A specialized NativeBatchReader for Iceberg that accepts ParquetMetadata as
a Thrift encoded byte
+ * array . This allows Iceberg to pass metadata in serialized form with a
two-step initialization
+ * pattern.
+ */
+public class IcebergCometNativeBatchReader extends NativeBatchReader {
+
+ public IcebergCometNativeBatchReader(StructType requiredSchema) {
+ super();
+ this.sparkSchema = requiredSchema;
+ }
+
+ /** Initialize the reader using FileInfo instead of PartitionedFile. */
Review Comment:
```suggestion
/** Initialize the reader using FileInfo instead of PartitionedFile. */
@Override
```
##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -261,15 +356,15 @@ public void init() throws Throwable {
ParquetReadOptions readOptions = builder.build();
Map<String, String> objectStoreOptions =
- asJava(NativeConfig.extractObjectStoreOptions(conf, file.pathUri()));
+ asJava(NativeConfig.extractObjectStoreOptions(conf, pathUri));
// TODO: enable off-heap buffer when they are ready
ReadOptions cometReadOptions = ReadOptions.builder(conf).build();
Path path = new Path(new URI(filePath));
try (FileReader fileReader =
new FileReader(
- CometInputFile.fromPath(path, conf), footer, readOptions,
cometReadOptions, metrics)) {
Review Comment:
Why is the footer not passed anymore ? This way it will be re-read at
https://github.com/parthchandra/datafusion-comet/blob/d8cd7b78c3509b2ec147d4991e0664d1a63febc1/common/src/main/java/org/apache/comet/parquet/FileReader.java#L201-L203
##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -648,13 +773,41 @@ private void checkParquetType(ParquetColumn column)
throws IOException {
}
} else { // A missing column which is either primitive or complex
if (column.required()) {
- // Column is missing in data but the required data is non-nullable.
This file is invalid.
- throw new IOException(
- "Required column is missing in data file. Col: " +
Arrays.toString(path));
+ // check if we have a preinitialized column reader for this column.
+ int columnIndex = getColumnIndexFromParquetColumn(column);
+ if (columnIndex == -1
+ || preInitializedReaders == null
+ || preInitializedReaders[columnIndex] == null) {
Review Comment:
This probably needs a check for boundaries before trying to access this
index.
##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -101,36 +102,87 @@
* </pre>
*/
public class NativeBatchReader extends RecordReader<Void, ColumnarBatch>
implements Closeable {
+
+ /**
+ * A class that contains the necessary file information for reading a
Parquet file. This class
+ * provides an abstraction over PartitionedFile properties.
+ */
+ public static class FileInfo {
+ private final long start;
+ private final long length;
+ private final String filePath;
+ private final long fileSize;
+
+ public FileInfo(long start, long length, String filePath, long fileSize)
+ throws URISyntaxException {
+ this.start = start;
+ this.length = length;
+ URI uri = new Path(filePath).toUri();
+ if (uri.getScheme() == null) {
+ uri = new Path("file://" + filePath).toUri();
+ }
+ this.filePath = uri.toString();
+ this.fileSize = fileSize;
+ }
+
+ public static FileInfo fromPartitionedFile(PartitionedFile file) throws
URISyntaxException {
+ return new FileInfo(file.start(), file.length(),
file.filePath().toString(), file.fileSize());
+ }
+
+ public long start() {
+ return start;
+ }
+
+ public long length() {
+ return length;
+ }
+
+ public String filePath() {
+ return filePath;
+ }
+
+ public long fileSize() {
+ return fileSize;
+ }
+
+ public URI pathUri() throws Exception {
Review Comment:
```suggestion
public URI pathUri() throws URISyntaxException {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]