This is an automated email from the ASF dual-hosted git repository. parthc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new bdd5c4339 fix: [iceberg] more fixes for Iceberg integration APIs. (#2078) bdd5c4339 is described below commit bdd5c4339ff38c49993a2b37fffce8d61b741d0d Author: Parth Chandra <par...@apache.org> AuthorDate: Fri Aug 8 08:13:56 2025 -0700 fix: [iceberg] more fixes for Iceberg integration APIs. (#2078) * fix: [iceberg] more fixes for Iceberg intergation APIs. --- .../apache/comet/parquet/AbstractColumnReader.java | 6 +- .../java/org/apache/comet/parquet/BatchReader.java | 9 ++- .../org/apache/comet/parquet/ColumnReader.java | 4 +- .../apache/comet/parquet/ConstantColumnReader.java | 15 ++++- .../java/org/apache/comet/parquet/FileReader.java | 19 +++--- .../org/apache/comet/parquet/LazyColumnReader.java | 2 +- .../apache/comet/parquet/MetadataColumnReader.java | 13 +++++ .../apache/comet/parquet/NativeBatchReader.java | 2 +- .../apache/comet/parquet/NativeColumnReader.java | 2 +- .../apache/comet/parquet/ParquetColumnSpec.java | 6 ++ .../java/org/apache/comet/parquet/TypeUtil.java | 13 ++++- .../main/java/org/apache/comet/parquet/Utils.java | 68 +++++++++++++++++++++- .../org/apache/comet/parquet/WrappedInputFile.java | 67 +++++++++++++++++++++ .../comet/parquet/WrappedSeekableInputStream.java | 64 ++++++++++++++++++++ .../apache/comet/vector/CometDelegateVector.java | 2 +- 15 files changed, 267 insertions(+), 25 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java index ef97abf74..b9f1797cb 100644 --- a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java @@ -63,7 +63,7 @@ public abstract class AbstractColumnReader implements AutoCloseable { /** A pointer to the native implementation of ColumnReader. */ protected long nativeHandle; - public AbstractColumnReader( + AbstractColumnReader( DataType type, Type fieldType, ColumnDescriptor descriptor, @@ -76,7 +76,7 @@ public abstract class AbstractColumnReader implements AutoCloseable { this.useLegacyDateTimestamp = useLegacyDateTimestamp; } - public AbstractColumnReader( + AbstractColumnReader( DataType type, ColumnDescriptor descriptor, boolean useDecimal128, @@ -85,7 +85,7 @@ public abstract class AbstractColumnReader implements AutoCloseable { TypeUtil.checkParquetType(descriptor, type); } - public ColumnDescriptor getDescriptor() { + ColumnDescriptor getDescriptor() { return descriptor; } diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index 538de4a66..edac28ec1 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -186,7 +186,8 @@ public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Cl } /** - * @deprecated since 0.9.1, will be removed in 0.10.0. + * @deprecated since 0.10.0, will be removed in 0.11.0. + * @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a> */ public BatchReader(AbstractColumnReader[] columnReaders) { // Todo: set useDecimal128 and useLazyMaterialization @@ -383,14 +384,16 @@ public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Cl } /** - * @deprecated since 0.9.1, will be removed in 0.10.0. + * @deprecated since 0.10.0, will be removed in 0.11.0. + * @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a> */ public void setSparkSchema(StructType schema) { this.sparkSchema = schema; } /** - * @deprecated since 0.9.1, will be removed in 0.10.0. + * @deprecated since 0.10.0, will be removed in 0.11.0. + * @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a> */ public AbstractColumnReader[] getColumnReaders() { return columnReaders; diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java index b2fe965e2..968da1959 100644 --- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java @@ -93,7 +93,7 @@ public class ColumnReader extends AbstractColumnReader { private ArrowArray array = null; private ArrowSchema schema = null; - public ColumnReader( + ColumnReader( DataType type, ColumnDescriptor descriptor, CometSchemaImporter importer, @@ -111,6 +111,8 @@ public class ColumnReader extends AbstractColumnReader { * Set the page reader for a new column chunk to read. Expects to call `readBatch` after this. * * @param pageReader the page reader for the new column chunk + * @deprecated since 0.10.0, will be removed in 0.11.0. + * @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a> */ public void setPageReader(PageReader pageReader) throws IOException { this.pageReader = pageReader; diff --git a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java index e010f8ab7..b8fc49a17 100644 --- a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java @@ -38,7 +38,7 @@ public class ConstantColumnReader extends MetadataColumnReader { /** The constant value in the format of Object that are used to initialize this column reader. */ private Object value; - public ConstantColumnReader(StructField field, int batchSize, boolean useDecimal128) { + ConstantColumnReader(StructField field, int batchSize, boolean useDecimal128) { this(field.dataType(), TypeUtil.convertToParquet(field), batchSize, useDecimal128); this.value = ResolveDefaultColumns.getExistenceDefaultValues(new StructType(new StructField[] {field}))[ @@ -46,18 +46,29 @@ public class ConstantColumnReader extends MetadataColumnReader { init(value); } - public ConstantColumnReader( + ConstantColumnReader( StructField field, int batchSize, InternalRow values, int index, boolean useDecimal128) { this(field.dataType(), TypeUtil.convertToParquet(field), batchSize, useDecimal128); init(values, index); } + /** + * @deprecated since 0.10.0, will be removed in 0.11.0. + * @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a> + */ public ConstantColumnReader( DataType type, ColumnDescriptor descriptor, Object value, boolean useDecimal128) { super(type, descriptor, useDecimal128, true); this.value = value; } + // Used by Iceberg + public ConstantColumnReader( + DataType type, ParquetColumnSpec spec, Object value, boolean useDecimal128) { + super(type, spec, useDecimal128, true); + this.value = value; + } + ConstantColumnReader( DataType type, ColumnDescriptor descriptor, int batchSize, boolean useDecimal128) { super(type, descriptor, useDecimal128, true); diff --git a/common/src/main/java/org/apache/comet/parquet/FileReader.java b/common/src/main/java/org/apache/comet/parquet/FileReader.java index af6c5b3c0..fa0d81f13 100644 --- a/common/src/main/java/org/apache/comet/parquet/FileReader.java +++ b/common/src/main/java/org/apache/comet/parquet/FileReader.java @@ -43,7 +43,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.Preconditions; @@ -130,15 +129,14 @@ public class FileReader implements Closeable { private RowGroupReader currentRowGroup = null; private InternalFileDecryptor fileDecryptor; - public FileReader(InputFile file, ParquetReadOptions options, ReadOptions cometOptions) + FileReader(InputFile file, ParquetReadOptions options, ReadOptions cometOptions) throws IOException { this(file, null, options, cometOptions, null); } /** This constructor is called from Apache Iceberg. */ public FileReader( - Path path, - Configuration conf, + WrappedInputFile file, ReadOptions cometOptions, Map<String, String> properties, Long start, @@ -147,9 +145,10 @@ public class FileReader implements Closeable { byte[] fileAADPrefix) throws IOException { ParquetReadOptions options = - buildParquetReadOptions(conf, properties, start, length, fileEncryptionKey, fileAADPrefix); + buildParquetReadOptions( + new Configuration(), properties, start, length, fileEncryptionKey, fileAADPrefix); this.converter = new ParquetMetadataConverter(options); - this.file = CometInputFile.fromPath(path, conf); + this.file = file; this.f = file.newStream(); this.options = options; this.cometOptions = cometOptions; @@ -177,7 +176,7 @@ public class FileReader implements Closeable { this.crc = options.usePageChecksumVerification() ? new CRC32() : null; } - public FileReader( + FileReader( InputFile file, ParquetReadOptions options, ReadOptions cometOptions, @@ -186,7 +185,7 @@ public class FileReader implements Closeable { this(file, null, options, cometOptions, metrics); } - public FileReader( + FileReader( InputFile file, ParquetMetadata footer, ParquetReadOptions options, @@ -226,12 +225,12 @@ public class FileReader implements Closeable { } /** Returns the footer of the Parquet file being read. */ - public ParquetMetadata getFooter() { + ParquetMetadata getFooter() { return this.footer; } /** Returns the metadata of the Parquet file being read. */ - public FileMetaData getFileMetaData() { + FileMetaData getFileMetaData() { return this.fileMetaData; } diff --git a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java index b22278ea7..f2772908b 100644 --- a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java @@ -43,7 +43,7 @@ public class LazyColumnReader extends ColumnReader { // The lazy vector being updated. private final CometLazyVector vector; - public LazyColumnReader( + LazyColumnReader( DataType sparkReadType, ColumnDescriptor descriptor, CometSchemaImporter importer, diff --git a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java index 2820c42f8..6240c8c8c 100644 --- a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java @@ -42,6 +42,10 @@ public class MetadataColumnReader extends AbstractColumnReader { private boolean isConstant; + /** + * @deprecated since 0.10.0, will be made package private in 0.11.0. + * @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a> + */ public MetadataColumnReader( DataType type, ColumnDescriptor descriptor, boolean useDecimal128, boolean isConstant) { // TODO: should we handle legacy dates & timestamps for metadata columns? @@ -50,6 +54,15 @@ public class MetadataColumnReader extends AbstractColumnReader { this.isConstant = isConstant; } + // Used by Iceberg + public MetadataColumnReader( + DataType type, ParquetColumnSpec spec, boolean useDecimal128, boolean isConstant) { + // TODO: should we handle legacy dates & timestamps for metadata columns? + super(type, Utils.buildColumnDescriptor(spec), useDecimal128, false); + + this.isConstant = isConstant; + } + @Override public void setBatchSize(int batchSize) { close(); diff --git a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java index ec22c8e4d..7595242c3 100644 --- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java @@ -186,7 +186,7 @@ public class NativeBatchReader extends RecordReader<Void, ColumnarBatch> impleme this.taskContext = TaskContext$.MODULE$.get(); } - public NativeBatchReader(AbstractColumnReader[] columnReaders) { + private NativeBatchReader(AbstractColumnReader[] columnReaders) { // Todo: set useDecimal128 and useLazyMaterialization int numColumns = columnReaders.length; this.columnReaders = new AbstractColumnReader[numColumns]; diff --git a/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java b/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java index 50838ef77..88447c147 100644 --- a/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java @@ -69,7 +69,7 @@ public class NativeColumnReader extends AbstractColumnReader { private long nativeBatchHandle = 0xDEADBEEFL; private final int columnNum; - public NativeColumnReader( + NativeColumnReader( long nativeBatchHandle, int columnNum, DataType type, diff --git a/common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java b/common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java index 49007f925..805aaa033 100644 --- a/common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java +++ b/common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java @@ -21,6 +21,12 @@ package org.apache.comet.parquet; import java.util.Map; +/** + * Parquet ColumnSpec encapsulates the information withing a Parquet ColumnDescriptor. Utility + * methods can convert from and to a ColumnDescriptor The only purpose of this class is to allow + * passing of Column descriptors between Comet and Iceberg. This is required because Iceberg shades + * Parquet, changing the package of Parquet classes and making then incompatible with Comet. + */ public class ParquetColumnSpec { private final int fieldId; diff --git a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java index 1e9d5b937..889e2baf5 100644 --- a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java +++ b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java @@ -31,9 +31,16 @@ import org.apache.spark.sql.types.*; import org.apache.comet.CometConf; +import static org.apache.comet.parquet.Utils.descriptorToParquetColumnSpec; + public class TypeUtil { - /** Converts the input Spark 'field' into a Parquet column descriptor. */ + /** + * Converts the input Spark 'field' into a Parquet column descriptor. + * + * @deprecated since 0.10.0, will be removed in 0.11.0. + * @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a> + */ public static ColumnDescriptor convertToParquet(StructField field) { Type.Repetition repetition; int maxDefinitionLevel; @@ -105,6 +112,10 @@ public class TypeUtil { return new ColumnDescriptor(path, builder.named(field.name()), 0, maxDefinitionLevel); } + public static ParquetColumnSpec convertToParquetSpec(StructField field) { + return descriptorToParquetColumnSpec(convertToParquet(field)); + } + /** * Check whether the Parquet 'descriptor' and Spark read type 'sparkType' are compatible. If not, * throw exception. diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java index 3e2e093a8..7fb2eac5b 100644 --- a/common/src/main/java/org/apache/comet/parquet/Utils.java +++ b/common/src/main/java/org/apache/comet/parquet/Utils.java @@ -19,6 +19,9 @@ package org.apache.comet.parquet; +import java.util.HashMap; +import java.util.Map; + import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; @@ -54,8 +57,9 @@ public class Utils { /** * This method is called from Apache Iceberg. * - * @deprecated since 0.9.1, will be removed in 0.10.0; use getColumnReader with ParquetColumnSpec + * @deprecated since 0.10.0, will be removed in 0.11.0; use getColumnReader with ParquetColumnSpec * instead. + * @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a> */ public static ColumnReader getColumnReader( DataType type, @@ -453,4 +457,66 @@ public class Utils { throw new IllegalArgumentException("Unknown logical type: " + logicalTypeName); } } + + public static ParquetColumnSpec descriptorToParquetColumnSpec(ColumnDescriptor descriptor) { + + String[] path = descriptor.getPath(); + PrimitiveType primitiveType = descriptor.getPrimitiveType(); + String physicalType = primitiveType.getPrimitiveTypeName().name(); + + int typeLength = + primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY + ? primitiveType.getTypeLength() + : 0; + + boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED; + + String logicalTypeName = null; + Map<String, String> logicalTypeParams = new HashMap<>(); + LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation(); + + if (logicalType != null) { + logicalTypeName = logicalType.getClass().getSimpleName(); + + // Handle specific logical types + if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal = + (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("precision", String.valueOf(decimal.getPrecision())); + logicalTypeParams.put("scale", String.valueOf(decimal.getScale())); + } else if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp = + (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isAdjustedToUTC", String.valueOf(timestamp.isAdjustedToUTC())); + logicalTypeParams.put("unit", timestamp.getUnit().name()); + } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeLogicalTypeAnnotation time = + (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isAdjustedToUTC", String.valueOf(time.isAdjustedToUTC())); + logicalTypeParams.put("unit", time.getUnit().name()); + } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) { + LogicalTypeAnnotation.IntLogicalTypeAnnotation intType = + (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned())); + logicalTypeParams.put("bitWidth", String.valueOf(intType.getBitWidth())); + } + } + + int id = -1; + Type type = descriptor.getPrimitiveType(); + if (type != null && type.getId() != null) { + id = type.getId().intValue(); + } + + return new ParquetColumnSpec( + id, + path, + physicalType, + typeLength, + isRepeated, + descriptor.getMaxDefinitionLevel(), + descriptor.getMaxRepetitionLevel(), + logicalTypeName, + logicalTypeParams); + } } diff --git a/common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java b/common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java new file mode 100644 index 000000000..666d4c2e7 --- /dev/null +++ b/common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Method; + +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.SeekableInputStream; + +/** + * Wraps an Object that possibly implements the methods of a Parquet InputFile (but is not a Parquet + * InputFile). Such an object` exists, for instance, in Iceberg's InputFile + */ +public class WrappedInputFile implements InputFile { + Object wrapped; + + public WrappedInputFile(Object inputFile) { + this.wrapped = inputFile; + } + + @Override + public long getLength() throws IOException { + try { + Method targetMethod = wrapped.getClass().getDeclaredMethod("getLength"); // + targetMethod.setAccessible(true); + return (long) targetMethod.invoke(wrapped); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public SeekableInputStream newStream() throws IOException { + try { + Method targetMethod = wrapped.getClass().getDeclaredMethod("newStream"); // + targetMethod.setAccessible(true); + InputStream stream = (InputStream) targetMethod.invoke(wrapped); + return new WrappedSeekableInputStream(stream); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public String toString() { + return wrapped.toString(); + } +} diff --git a/common/src/main/java/org/apache/comet/parquet/WrappedSeekableInputStream.java b/common/src/main/java/org/apache/comet/parquet/WrappedSeekableInputStream.java new file mode 100644 index 000000000..c463617bd --- /dev/null +++ b/common/src/main/java/org/apache/comet/parquet/WrappedSeekableInputStream.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Method; +import java.util.Objects; + +import org.apache.parquet.io.DelegatingSeekableInputStream; + +/** + * Wraps an InputStream that possibly implements the methods of a Parquet SeekableInputStream (but + * is not a Parquet SeekableInputStream). Such an InputStream exists, for instance, in Iceberg's + * SeekableInputStream + */ +public class WrappedSeekableInputStream extends DelegatingSeekableInputStream { + + private final InputStream wrappedInputStream; // The InputStream we are wrapping + + public WrappedSeekableInputStream(InputStream inputStream) { + super(inputStream); + this.wrappedInputStream = Objects.requireNonNull(inputStream, "InputStream cannot be null"); + } + + @Override + public long getPos() throws IOException { + try { + Method targetMethod = wrappedInputStream.getClass().getDeclaredMethod("getPos"); // + targetMethod.setAccessible(true); + return (long) targetMethod.invoke(wrappedInputStream); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public void seek(long newPos) throws IOException { + try { + Method targetMethod = wrappedInputStream.getClass().getDeclaredMethod("seek", long.class); + targetMethod.setAccessible(true); + targetMethod.invoke(wrappedInputStream, newPos); + } catch (Exception e) { + throw new IOException(e); + } + } +} diff --git a/common/src/main/java/org/apache/comet/vector/CometDelegateVector.java b/common/src/main/java/org/apache/comet/vector/CometDelegateVector.java index 75e62ea0f..8874d11b7 100644 --- a/common/src/main/java/org/apache/comet/vector/CometDelegateVector.java +++ b/common/src/main/java/org/apache/comet/vector/CometDelegateVector.java @@ -48,7 +48,7 @@ public class CometDelegateVector extends CometVector { this.delegate = delegate; } - public void setDelegate(CometVector delegate) { + protected void setDelegate(CometVector delegate) { this.delegate = delegate; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org