This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new bdd5c4339 fix: [iceberg] more fixes for Iceberg integration APIs. 
(#2078)
bdd5c4339 is described below

commit bdd5c4339ff38c49993a2b37fffce8d61b741d0d
Author: Parth Chandra <par...@apache.org>
AuthorDate: Fri Aug 8 08:13:56 2025 -0700

    fix: [iceberg] more fixes for Iceberg integration APIs. (#2078)
    
    * fix: [iceberg] more fixes for Iceberg intergation APIs.
---
 .../apache/comet/parquet/AbstractColumnReader.java |  6 +-
 .../java/org/apache/comet/parquet/BatchReader.java |  9 ++-
 .../org/apache/comet/parquet/ColumnReader.java     |  4 +-
 .../apache/comet/parquet/ConstantColumnReader.java | 15 ++++-
 .../java/org/apache/comet/parquet/FileReader.java  | 19 +++---
 .../org/apache/comet/parquet/LazyColumnReader.java |  2 +-
 .../apache/comet/parquet/MetadataColumnReader.java | 13 +++++
 .../apache/comet/parquet/NativeBatchReader.java    |  2 +-
 .../apache/comet/parquet/NativeColumnReader.java   |  2 +-
 .../apache/comet/parquet/ParquetColumnSpec.java    |  6 ++
 .../java/org/apache/comet/parquet/TypeUtil.java    | 13 ++++-
 .../main/java/org/apache/comet/parquet/Utils.java  | 68 +++++++++++++++++++++-
 .../org/apache/comet/parquet/WrappedInputFile.java | 67 +++++++++++++++++++++
 .../comet/parquet/WrappedSeekableInputStream.java  | 64 ++++++++++++++++++++
 .../apache/comet/vector/CometDelegateVector.java   |  2 +-
 15 files changed, 267 insertions(+), 25 deletions(-)

diff --git 
a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java 
b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java
index ef97abf74..b9f1797cb 100644
--- a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java
@@ -63,7 +63,7 @@ public abstract class AbstractColumnReader implements 
AutoCloseable {
   /** A pointer to the native implementation of ColumnReader. */
   protected long nativeHandle;
 
-  public AbstractColumnReader(
+  AbstractColumnReader(
       DataType type,
       Type fieldType,
       ColumnDescriptor descriptor,
@@ -76,7 +76,7 @@ public abstract class AbstractColumnReader implements 
AutoCloseable {
     this.useLegacyDateTimestamp = useLegacyDateTimestamp;
   }
 
-  public AbstractColumnReader(
+  AbstractColumnReader(
       DataType type,
       ColumnDescriptor descriptor,
       boolean useDecimal128,
@@ -85,7 +85,7 @@ public abstract class AbstractColumnReader implements 
AutoCloseable {
     TypeUtil.checkParquetType(descriptor, type);
   }
 
-  public ColumnDescriptor getDescriptor() {
+  ColumnDescriptor getDescriptor() {
     return descriptor;
   }
 
diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java 
b/common/src/main/java/org/apache/comet/parquet/BatchReader.java
index 538de4a66..edac28ec1 100644
--- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java
@@ -186,7 +186,8 @@ public class BatchReader extends RecordReader<Void, 
ColumnarBatch> implements Cl
   }
 
   /**
-   * @deprecated since 0.9.1, will be removed in 0.10.0.
+   * @deprecated since 0.10.0, will be removed in 0.11.0.
+   * @see <a 
href="https://github.com/apache/datafusion-comet/issues/2079";>Comet Issue 
#2079</a>
    */
   public BatchReader(AbstractColumnReader[] columnReaders) {
     // Todo: set useDecimal128 and useLazyMaterialization
@@ -383,14 +384,16 @@ public class BatchReader extends RecordReader<Void, 
ColumnarBatch> implements Cl
   }
 
   /**
-   * @deprecated since 0.9.1, will be removed in 0.10.0.
+   * @deprecated since 0.10.0, will be removed in 0.11.0.
+   * @see <a 
href="https://github.com/apache/datafusion-comet/issues/2079";>Comet Issue 
#2079</a>
    */
   public void setSparkSchema(StructType schema) {
     this.sparkSchema = schema;
   }
 
   /**
-   * @deprecated since 0.9.1, will be removed in 0.10.0.
+   * @deprecated since 0.10.0, will be removed in 0.11.0.
+   * @see <a 
href="https://github.com/apache/datafusion-comet/issues/2079";>Comet Issue 
#2079</a>
    */
   public AbstractColumnReader[] getColumnReaders() {
     return columnReaders;
diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java 
b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
index b2fe965e2..968da1959 100644
--- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
@@ -93,7 +93,7 @@ public class ColumnReader extends AbstractColumnReader {
   private ArrowArray array = null;
   private ArrowSchema schema = null;
 
-  public ColumnReader(
+  ColumnReader(
       DataType type,
       ColumnDescriptor descriptor,
       CometSchemaImporter importer,
@@ -111,6 +111,8 @@ public class ColumnReader extends AbstractColumnReader {
    * Set the page reader for a new column chunk to read. Expects to call 
`readBatch` after this.
    *
    * @param pageReader the page reader for the new column chunk
+   * @deprecated since 0.10.0, will be removed in 0.11.0.
+   * @see <a 
href="https://github.com/apache/datafusion-comet/issues/2079";>Comet Issue 
#2079</a>
    */
   public void setPageReader(PageReader pageReader) throws IOException {
     this.pageReader = pageReader;
diff --git 
a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java 
b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java
index e010f8ab7..b8fc49a17 100644
--- a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java
@@ -38,7 +38,7 @@ public class ConstantColumnReader extends 
MetadataColumnReader {
   /** The constant value in the format of Object that are used to initialize 
this column reader. */
   private Object value;
 
-  public ConstantColumnReader(StructField field, int batchSize, boolean 
useDecimal128) {
+  ConstantColumnReader(StructField field, int batchSize, boolean 
useDecimal128) {
     this(field.dataType(), TypeUtil.convertToParquet(field), batchSize, 
useDecimal128);
     this.value =
         ResolveDefaultColumns.getExistenceDefaultValues(new StructType(new 
StructField[] {field}))[
@@ -46,18 +46,29 @@ public class ConstantColumnReader extends 
MetadataColumnReader {
     init(value);
   }
 
-  public ConstantColumnReader(
+  ConstantColumnReader(
       StructField field, int batchSize, InternalRow values, int index, boolean 
useDecimal128) {
     this(field.dataType(), TypeUtil.convertToParquet(field), batchSize, 
useDecimal128);
     init(values, index);
   }
 
+  /**
+   * @deprecated since 0.10.0, will be removed in 0.11.0.
+   * @see <a 
href="https://github.com/apache/datafusion-comet/issues/2079";>Comet Issue 
#2079</a>
+   */
   public ConstantColumnReader(
       DataType type, ColumnDescriptor descriptor, Object value, boolean 
useDecimal128) {
     super(type, descriptor, useDecimal128, true);
     this.value = value;
   }
 
+  // Used by Iceberg
+  public ConstantColumnReader(
+      DataType type, ParquetColumnSpec spec, Object value, boolean 
useDecimal128) {
+    super(type, spec, useDecimal128, true);
+    this.value = value;
+  }
+
   ConstantColumnReader(
       DataType type, ColumnDescriptor descriptor, int batchSize, boolean 
useDecimal128) {
     super(type, descriptor, useDecimal128, true);
diff --git a/common/src/main/java/org/apache/comet/parquet/FileReader.java 
b/common/src/main/java/org/apache/comet/parquet/FileReader.java
index af6c5b3c0..fa0d81f13 100644
--- a/common/src/main/java/org/apache/comet/parquet/FileReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/FileReader.java
@@ -43,7 +43,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.parquet.HadoopReadOptions;
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.Preconditions;
@@ -130,15 +129,14 @@ public class FileReader implements Closeable {
   private RowGroupReader currentRowGroup = null;
   private InternalFileDecryptor fileDecryptor;
 
-  public FileReader(InputFile file, ParquetReadOptions options, ReadOptions 
cometOptions)
+  FileReader(InputFile file, ParquetReadOptions options, ReadOptions 
cometOptions)
       throws IOException {
     this(file, null, options, cometOptions, null);
   }
 
   /** This constructor is called from Apache Iceberg. */
   public FileReader(
-      Path path,
-      Configuration conf,
+      WrappedInputFile file,
       ReadOptions cometOptions,
       Map<String, String> properties,
       Long start,
@@ -147,9 +145,10 @@ public class FileReader implements Closeable {
       byte[] fileAADPrefix)
       throws IOException {
     ParquetReadOptions options =
-        buildParquetReadOptions(conf, properties, start, length, 
fileEncryptionKey, fileAADPrefix);
+        buildParquetReadOptions(
+            new Configuration(), properties, start, length, fileEncryptionKey, 
fileAADPrefix);
     this.converter = new ParquetMetadataConverter(options);
-    this.file = CometInputFile.fromPath(path, conf);
+    this.file = file;
     this.f = file.newStream();
     this.options = options;
     this.cometOptions = cometOptions;
@@ -177,7 +176,7 @@ public class FileReader implements Closeable {
     this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
   }
 
-  public FileReader(
+  FileReader(
       InputFile file,
       ParquetReadOptions options,
       ReadOptions cometOptions,
@@ -186,7 +185,7 @@ public class FileReader implements Closeable {
     this(file, null, options, cometOptions, metrics);
   }
 
-  public FileReader(
+  FileReader(
       InputFile file,
       ParquetMetadata footer,
       ParquetReadOptions options,
@@ -226,12 +225,12 @@ public class FileReader implements Closeable {
   }
 
   /** Returns the footer of the Parquet file being read. */
-  public ParquetMetadata getFooter() {
+  ParquetMetadata getFooter() {
     return this.footer;
   }
 
   /** Returns the metadata of the Parquet file being read. */
-  public FileMetaData getFileMetaData() {
+  FileMetaData getFileMetaData() {
     return this.fileMetaData;
   }
 
diff --git 
a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java 
b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java
index b22278ea7..f2772908b 100644
--- a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java
@@ -43,7 +43,7 @@ public class LazyColumnReader extends ColumnReader {
   // The lazy vector being updated.
   private final CometLazyVector vector;
 
-  public LazyColumnReader(
+  LazyColumnReader(
       DataType sparkReadType,
       ColumnDescriptor descriptor,
       CometSchemaImporter importer,
diff --git 
a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java 
b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java
index 2820c42f8..6240c8c8c 100644
--- a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java
@@ -42,6 +42,10 @@ public class MetadataColumnReader extends 
AbstractColumnReader {
 
   private boolean isConstant;
 
+  /**
+   * @deprecated since 0.10.0, will be made package private in 0.11.0.
+   * @see <a 
href="https://github.com/apache/datafusion-comet/issues/2079";>Comet Issue 
#2079</a>
+   */
   public MetadataColumnReader(
       DataType type, ColumnDescriptor descriptor, boolean useDecimal128, 
boolean isConstant) {
     // TODO: should we handle legacy dates & timestamps for metadata columns?
@@ -50,6 +54,15 @@ public class MetadataColumnReader extends 
AbstractColumnReader {
     this.isConstant = isConstant;
   }
 
+  // Used by Iceberg
+  public MetadataColumnReader(
+      DataType type, ParquetColumnSpec spec, boolean useDecimal128, boolean 
isConstant) {
+    // TODO: should we handle legacy dates & timestamps for metadata columns?
+    super(type, Utils.buildColumnDescriptor(spec), useDecimal128, false);
+
+    this.isConstant = isConstant;
+  }
+
   @Override
   public void setBatchSize(int batchSize) {
     close();
diff --git 
a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java 
b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
index ec22c8e4d..7595242c3 100644
--- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
@@ -186,7 +186,7 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
     this.taskContext = TaskContext$.MODULE$.get();
   }
 
-  public NativeBatchReader(AbstractColumnReader[] columnReaders) {
+  private NativeBatchReader(AbstractColumnReader[] columnReaders) {
     // Todo: set useDecimal128 and useLazyMaterialization
     int numColumns = columnReaders.length;
     this.columnReaders = new AbstractColumnReader[numColumns];
diff --git 
a/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java 
b/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java
index 50838ef77..88447c147 100644
--- a/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java
@@ -69,7 +69,7 @@ public class NativeColumnReader extends AbstractColumnReader {
   private long nativeBatchHandle = 0xDEADBEEFL;
   private final int columnNum;
 
-  public NativeColumnReader(
+  NativeColumnReader(
       long nativeBatchHandle,
       int columnNum,
       DataType type,
diff --git 
a/common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java 
b/common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java
index 49007f925..805aaa033 100644
--- a/common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java
+++ b/common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java
@@ -21,6 +21,12 @@ package org.apache.comet.parquet;
 
 import java.util.Map;
 
+/**
+ * Parquet ColumnSpec encapsulates the information withing a Parquet 
ColumnDescriptor. Utility
+ * methods can convert from and to a ColumnDescriptor The only purpose of this 
class is to allow
+ * passing of Column descriptors between Comet and Iceberg. This is required 
because Iceberg shades
+ * Parquet, changing the package of Parquet classes and making then 
incompatible with Comet.
+ */
 public class ParquetColumnSpec {
 
   private final int fieldId;
diff --git a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java 
b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java
index 1e9d5b937..889e2baf5 100644
--- a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java
+++ b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java
@@ -31,9 +31,16 @@ import org.apache.spark.sql.types.*;
 
 import org.apache.comet.CometConf;
 
+import static org.apache.comet.parquet.Utils.descriptorToParquetColumnSpec;
+
 public class TypeUtil {
 
-  /** Converts the input Spark 'field' into a Parquet column descriptor. */
+  /**
+   * Converts the input Spark 'field' into a Parquet column descriptor.
+   *
+   * @deprecated since 0.10.0, will be removed in 0.11.0.
+   * @see <a 
href="https://github.com/apache/datafusion-comet/issues/2079";>Comet Issue 
#2079</a>
+   */
   public static ColumnDescriptor convertToParquet(StructField field) {
     Type.Repetition repetition;
     int maxDefinitionLevel;
@@ -105,6 +112,10 @@ public class TypeUtil {
     return new ColumnDescriptor(path, builder.named(field.name()), 0, 
maxDefinitionLevel);
   }
 
+  public static ParquetColumnSpec convertToParquetSpec(StructField field) {
+    return descriptorToParquetColumnSpec(convertToParquet(field));
+  }
+
   /**
    * Check whether the Parquet 'descriptor' and Spark read type 'sparkType' 
are compatible. If not,
    * throw exception.
diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java 
b/common/src/main/java/org/apache/comet/parquet/Utils.java
index 3e2e093a8..7fb2eac5b 100644
--- a/common/src/main/java/org/apache/comet/parquet/Utils.java
+++ b/common/src/main/java/org/apache/comet/parquet/Utils.java
@@ -19,6 +19,9 @@
 
 package org.apache.comet.parquet;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.PrimitiveType;
@@ -54,8 +57,9 @@ public class Utils {
   /**
    * This method is called from Apache Iceberg.
    *
-   * @deprecated since 0.9.1, will be removed in 0.10.0; use getColumnReader 
with ParquetColumnSpec
+   * @deprecated since 0.10.0, will be removed in 0.11.0; use getColumnReader 
with ParquetColumnSpec
    *     instead.
+   * @see <a 
href="https://github.com/apache/datafusion-comet/issues/2079";>Comet Issue 
#2079</a>
    */
   public static ColumnReader getColumnReader(
       DataType type,
@@ -453,4 +457,66 @@ public class Utils {
         throw new IllegalArgumentException("Unknown logical type: " + 
logicalTypeName);
     }
   }
+
+  public static ParquetColumnSpec 
descriptorToParquetColumnSpec(ColumnDescriptor descriptor) {
+
+    String[] path = descriptor.getPath();
+    PrimitiveType primitiveType = descriptor.getPrimitiveType();
+    String physicalType = primitiveType.getPrimitiveTypeName().name();
+
+    int typeLength =
+        primitiveType.getPrimitiveTypeName() == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
+            ? primitiveType.getTypeLength()
+            : 0;
+
+    boolean isRepeated = primitiveType.getRepetition() == 
Type.Repetition.REPEATED;
+
+    String logicalTypeName = null;
+    Map<String, String> logicalTypeParams = new HashMap<>();
+    LogicalTypeAnnotation logicalType = 
primitiveType.getLogicalTypeAnnotation();
+
+    if (logicalType != null) {
+      logicalTypeName = logicalType.getClass().getSimpleName();
+
+      // Handle specific logical types
+      if (logicalType instanceof 
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+        LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal =
+            (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType;
+        logicalTypeParams.put("precision", 
String.valueOf(decimal.getPrecision()));
+        logicalTypeParams.put("scale", String.valueOf(decimal.getScale()));
+      } else if (logicalType instanceof 
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
+        LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp =
+            (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType;
+        logicalTypeParams.put("isAdjustedToUTC", 
String.valueOf(timestamp.isAdjustedToUTC()));
+        logicalTypeParams.put("unit", timestamp.getUnit().name());
+      } else if (logicalType instanceof 
LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
+        LogicalTypeAnnotation.TimeLogicalTypeAnnotation time =
+            (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType;
+        logicalTypeParams.put("isAdjustedToUTC", 
String.valueOf(time.isAdjustedToUTC()));
+        logicalTypeParams.put("unit", time.getUnit().name());
+      } else if (logicalType instanceof 
LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
+        LogicalTypeAnnotation.IntLogicalTypeAnnotation intType =
+            (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType;
+        logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned()));
+        logicalTypeParams.put("bitWidth", 
String.valueOf(intType.getBitWidth()));
+      }
+    }
+
+    int id = -1;
+    Type type = descriptor.getPrimitiveType();
+    if (type != null && type.getId() != null) {
+      id = type.getId().intValue();
+    }
+
+    return new ParquetColumnSpec(
+        id,
+        path,
+        physicalType,
+        typeLength,
+        isRepeated,
+        descriptor.getMaxDefinitionLevel(),
+        descriptor.getMaxRepetitionLevel(),
+        logicalTypeName,
+        logicalTypeParams);
+  }
 }
diff --git 
a/common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java 
b/common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java
new file mode 100644
index 000000000..666d4c2e7
--- /dev/null
+++ b/common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.parquet;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+/**
+ * Wraps an Object that possibly implements the methods of a Parquet InputFile 
(but is not a Parquet
+ * InputFile). Such an object` exists, for instance, in Iceberg's InputFile
+ */
+public class WrappedInputFile implements InputFile {
+  Object wrapped;
+
+  public WrappedInputFile(Object inputFile) {
+    this.wrapped = inputFile;
+  }
+
+  @Override
+  public long getLength() throws IOException {
+    try {
+      Method targetMethod = wrapped.getClass().getDeclaredMethod("getLength"); 
//
+      targetMethod.setAccessible(true);
+      return (long) targetMethod.invoke(wrapped);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public SeekableInputStream newStream() throws IOException {
+    try {
+      Method targetMethod = wrapped.getClass().getDeclaredMethod("newStream"); 
//
+      targetMethod.setAccessible(true);
+      InputStream stream = (InputStream) targetMethod.invoke(wrapped);
+      return new WrappedSeekableInputStream(stream);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return wrapped.toString();
+  }
+}
diff --git 
a/common/src/main/java/org/apache/comet/parquet/WrappedSeekableInputStream.java 
b/common/src/main/java/org/apache/comet/parquet/WrappedSeekableInputStream.java
new file mode 100644
index 000000000..c463617bd
--- /dev/null
+++ 
b/common/src/main/java/org/apache/comet/parquet/WrappedSeekableInputStream.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.parquet;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.util.Objects;
+
+import org.apache.parquet.io.DelegatingSeekableInputStream;
+
+/**
+ * Wraps an InputStream that possibly implements the methods of a Parquet 
SeekableInputStream (but
+ * is not a Parquet SeekableInputStream). Such an InputStream exists, for 
instance, in Iceberg's
+ * SeekableInputStream
+ */
+public class WrappedSeekableInputStream extends DelegatingSeekableInputStream {
+
+  private final InputStream wrappedInputStream; // The InputStream we are 
wrapping
+
+  public WrappedSeekableInputStream(InputStream inputStream) {
+    super(inputStream);
+    this.wrappedInputStream = Objects.requireNonNull(inputStream, "InputStream 
cannot be null");
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    try {
+      Method targetMethod = 
wrappedInputStream.getClass().getDeclaredMethod("getPos"); //
+      targetMethod.setAccessible(true);
+      return (long) targetMethod.invoke(wrappedInputStream);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void seek(long newPos) throws IOException {
+    try {
+      Method targetMethod = 
wrappedInputStream.getClass().getDeclaredMethod("seek", long.class);
+      targetMethod.setAccessible(true);
+      targetMethod.invoke(wrappedInputStream, newPos);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+}
diff --git 
a/common/src/main/java/org/apache/comet/vector/CometDelegateVector.java 
b/common/src/main/java/org/apache/comet/vector/CometDelegateVector.java
index 75e62ea0f..8874d11b7 100644
--- a/common/src/main/java/org/apache/comet/vector/CometDelegateVector.java
+++ b/common/src/main/java/org/apache/comet/vector/CometDelegateVector.java
@@ -48,7 +48,7 @@ public class CometDelegateVector extends CometVector {
     this.delegate = delegate;
   }
 
-  public void setDelegate(CometVector delegate) {
+  protected void setDelegate(CometVector delegate) {
     this.delegate = delegate;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to