martin-g commented on code in PR #3237: URL: https://github.com/apache/datafusion-comet/pull/3237#discussion_r2718623577
########## docs/source/contributor-guide/iceberg_public_api.md: ########## @@ -0,0 +1,342 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Public API for Apache Iceberg Integration + +This document describes the Comet classes and methods that form the public API used by +[Apache Iceberg](https://iceberg.apache.org/). These APIs enable Iceberg to leverage Comet's +native Parquet reader for vectorized reads in Spark. + +**Important**: Changes to these APIs may break Iceberg's Comet integration. Contributors should +exercise caution when modifying these classes and consider backward compatibility. + +All classes and methods documented here are marked with the `@IcebergApi` annotation +(`org.apache.comet.IcebergApi`) to make them easily identifiable in the source code. + +## Overview + +Iceberg uses Comet's Parquet reading infrastructure to accelerate table scans. The integration +uses two approaches: + +1. **Hybrid Reader**: Native Parquet decoding with JVM-based I/O (requires building Iceberg + from source with Comet patches) +2. **Native Reader**: Fully-native Iceberg scans using iceberg-rust + +This document focuses on the Hybrid Reader API, which is used when Iceberg is configured with +`spark.sql.iceberg.parquet.reader-type=COMET`. + +## Package: `org.apache.comet.parquet` + +### FileReader + +Main class for reading Parquet files with native decoding. + +```java +// Constructor +public FileReader( + WrappedInputFile inputFile, + ReadOptions options, + Map<String, String> properties, + Long start, + Long length, + byte[] fileEncryptionKey, + byte[] fileAADPrefix +) throws IOException + +// Methods used by Iceberg +public void setRequestedSchemaFromSpecs(List<ParquetColumnSpec> specs) +public RowGroupReader readNextRowGroup() throws IOException +public void skipNextRowGroup() Review Comment: ```suggestion public boolean skipNextRowGroup() ``` ########## docs/source/contributor-guide/iceberg_public_api.md: ########## @@ -0,0 +1,342 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Public API for Apache Iceberg Integration + +This document describes the Comet classes and methods that form the public API used by +[Apache Iceberg](https://iceberg.apache.org/). These APIs enable Iceberg to leverage Comet's +native Parquet reader for vectorized reads in Spark. + +**Important**: Changes to these APIs may break Iceberg's Comet integration. Contributors should +exercise caution when modifying these classes and consider backward compatibility. + +All classes and methods documented here are marked with the `@IcebergApi` annotation +(`org.apache.comet.IcebergApi`) to make them easily identifiable in the source code. + +## Overview + +Iceberg uses Comet's Parquet reading infrastructure to accelerate table scans. The integration +uses two approaches: + +1. **Hybrid Reader**: Native Parquet decoding with JVM-based I/O (requires building Iceberg + from source with Comet patches) +2. **Native Reader**: Fully-native Iceberg scans using iceberg-rust + +This document focuses on the Hybrid Reader API, which is used when Iceberg is configured with +`spark.sql.iceberg.parquet.reader-type=COMET`. + +## Package: `org.apache.comet.parquet` + +### FileReader + +Main class for reading Parquet files with native decoding. + +```java +// Constructor +public FileReader( + WrappedInputFile inputFile, + ReadOptions options, + Map<String, String> properties, + Long start, + Long length, + byte[] fileEncryptionKey, + byte[] fileAADPrefix +) throws IOException + +// Methods used by Iceberg +public void setRequestedSchemaFromSpecs(List<ParquetColumnSpec> specs) +public RowGroupReader readNextRowGroup() throws IOException +public void skipNextRowGroup() +public void close() throws IOException +``` + +### RowGroupReader + +Provides access to row group data. + +```java +// Methods used by Iceberg +public long getRowCount() +``` + +### ReadOptions + +Configuration for Parquet read operations. + +```java +// Builder pattern +public static Builder builder(Configuration conf) + +public class Builder { + public ReadOptions build() +} +``` + +### WrappedInputFile + +Wrapper that adapts Iceberg's `InputFile` interface to Comet's file reading infrastructure. + +```java +// Constructor +public WrappedInputFile(org.apache.iceberg.io.InputFile inputFile) Review Comment: https://github.com/apache/datafusion-comet/blob/640dd03b17838edac6fc9beed8bde9b383888d82/common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java#L40 actually accepts `java.lang.Object` ########## common/src/main/java/org/apache/comet/parquet/ReadOptions.java: ########## Review Comment: Maybe this constructor should be annotated too ? Or hide it by reducing its visibility and always go thru `.builder(conf)` ? ########## iceberg-public-api/src/test/java/org/apache/comet/iceberg/api/parquet/AbstractColumnReaderApiTest.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg.api.parquet; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; + +import org.junit.Test; + +import org.apache.comet.iceberg.api.AbstractApiTest; +import org.apache.comet.parquet.AbstractColumnReader; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the AbstractColumnReader public API. */ +public class AbstractColumnReaderApiTest extends AbstractApiTest { + + @Test + public void testAbstractColumnReaderHasIcebergApiAnnotation() { + assertThat(hasIcebergApiAnnotation(AbstractColumnReader.class)).isTrue(); + } + + @Test + public void testAbstractColumnReaderIsPublic() { + assertThat(isPublic(AbstractColumnReader.class)).isTrue(); + } + + @Test + public void testAbstractColumnReaderIsAbstract() { + assertThat(Modifier.isAbstract(AbstractColumnReader.class.getModifiers())).isTrue(); + } + + @Test + public void testImplementsAutoCloseable() { + assertThat(AutoCloseable.class.isAssignableFrom(AbstractColumnReader.class)).isTrue(); + } + + @Test + public void testSetBatchSizeMethodExists() throws NoSuchMethodException { + Method method = AbstractColumnReader.class.getMethod("setBatchSize", int.class); + assertThat(method).isNotNull(); + assertThat(hasIcebergApiAnnotation(method)).isTrue(); + } + + @Test + public void testCloseMethodExists() throws NoSuchMethodException { + Method method = AbstractColumnReader.class.getMethod("close"); + assertThat(method).isNotNull(); Review Comment: ```suggestion ``` ########## docs/source/contributor-guide/iceberg_public_api.md: ########## @@ -0,0 +1,342 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Public API for Apache Iceberg Integration + +This document describes the Comet classes and methods that form the public API used by +[Apache Iceberg](https://iceberg.apache.org/). These APIs enable Iceberg to leverage Comet's +native Parquet reader for vectorized reads in Spark. + +**Important**: Changes to these APIs may break Iceberg's Comet integration. Contributors should +exercise caution when modifying these classes and consider backward compatibility. + +All classes and methods documented here are marked with the `@IcebergApi` annotation +(`org.apache.comet.IcebergApi`) to make them easily identifiable in the source code. + +## Overview + +Iceberg uses Comet's Parquet reading infrastructure to accelerate table scans. The integration +uses two approaches: + +1. **Hybrid Reader**: Native Parquet decoding with JVM-based I/O (requires building Iceberg + from source with Comet patches) +2. **Native Reader**: Fully-native Iceberg scans using iceberg-rust + +This document focuses on the Hybrid Reader API, which is used when Iceberg is configured with +`spark.sql.iceberg.parquet.reader-type=COMET`. + +## Package: `org.apache.comet.parquet` + +### FileReader + +Main class for reading Parquet files with native decoding. + +```java +// Constructor +public FileReader( + WrappedInputFile inputFile, + ReadOptions options, + Map<String, String> properties, + Long start, + Long length, + byte[] fileEncryptionKey, + byte[] fileAADPrefix +) throws IOException + +// Methods used by Iceberg +public void setRequestedSchemaFromSpecs(List<ParquetColumnSpec> specs) +public RowGroupReader readNextRowGroup() throws IOException +public void skipNextRowGroup() +public void close() throws IOException +``` + +### RowGroupReader + +Provides access to row group data. + +```java +// Methods used by Iceberg +public long getRowCount() +``` + +### ReadOptions + +Configuration for Parquet read operations. + +```java +// Builder pattern +public static Builder builder(Configuration conf) + +public class Builder { + public ReadOptions build() +} +``` + +### WrappedInputFile + +Wrapper that adapts Iceberg's `InputFile` interface to Comet's file reading infrastructure. + +```java +// Constructor +public WrappedInputFile(org.apache.iceberg.io.InputFile inputFile) +``` + +### ParquetColumnSpec + +Specification describing a Parquet column's schema information. + +```java +// Constructor +public ParquetColumnSpec( + int fieldId, + String[] path, + String physicalType, + int typeLength, + boolean isRepeated, + int maxDefinitionLevel, + int maxRepetitionLevel, + String logicalTypeName, + Map<String, String> logicalTypeParams +) + +// Getters used by Iceberg +public int getFieldId() +public String[] getPath() +public String getPhysicalType() +public int getTypeLength() +public int getMaxDefinitionLevel() +public int getMaxRepetitionLevel() +public String getLogicalTypeName() +public Map<String, String> getLogicalTypeParams() +``` + +### AbstractColumnReader + +Base class for column readers. + +```java +// Protected field accessed by Iceberg subclasses +protected long nativeHandle + +// Methods used by Iceberg +public void setBatchSize(int batchSize) +public void close() +``` + +### ColumnReader + +Column reader for regular Parquet columns (extends `AbstractColumnReader`). + +```java +// Methods used by Iceberg +public void setPageReader(PageReader pageReader) throws IOException +``` + +### BatchReader + +Coordinates reading batches across multiple column readers. + +```java +// Constructor +public BatchReader(AbstractColumnReader[] columnReaders) + +// Methods used by Iceberg +public void setSparkSchema(StructType schema) +public AbstractColumnReader[] getColumnReaders() +public void nextBatch(int batchSize) +``` + +### MetadataColumnReader + +Reader for metadata columns (used for Iceberg's delete and position columns). + +```java +// Constructor +public MetadataColumnReader( + DataType sparkType, + ColumnDescriptor descriptor, + boolean useDecimal128, + boolean isConstant +) + +// Methods used by Iceberg +public void readBatch(int total) +public CometVector currentBatch() + +// Protected field accessed by subclasses +protected long nativeHandle +``` + +### ConstantColumnReader + +Reader for columns with constant/default values (extends `MetadataColumnReader`). + +```java +// Constructor +public ConstantColumnReader( + DataType sparkType, + ColumnDescriptor descriptor, + Object value, + boolean useDecimal128 +) +``` + +### Native + +JNI interface for native operations. + +```java +// Static methods used by Iceberg +public static void resetBatch(long nativeHandle) +public static void setIsDeleted(long nativeHandle, boolean[] isDeleted) +public static void setPosition(long nativeHandle, long position, int total) +``` + +### TypeUtil + +Utilities for Parquet type conversions. + +```java +// Methods used by Iceberg +public static ColumnDescriptor convertToParquet(StructField sparkField) +``` + +### Utils + +General utility methods. + +```java +// Methods used by Iceberg +public static AbstractColumnReader getColumnReader( + DataType sparkType, + ColumnDescriptor descriptor, + CometSchemaImporter importer, + int batchSize, + boolean useDecimal128, + boolean isConstant +) +``` + +## Package: `org.apache.comet` + +### CometSchemaImporter + +Imports and converts schemas between Arrow and Spark formats. + +```java +// Constructor +public CometSchemaImporter(RootAllocator allocator) + +// Methods used by Iceberg (inherited from AbstractCometSchemaImporter) +public void close() +``` + +## Package: `org.apache.arrow.c` + +### AbstractCometSchemaImporter + +Base class for `CometSchemaImporter`. + +```java +// Methods used by Iceberg +public void close() +``` + +## Package: `org.apache.comet.vector` + +### CometVector + +Base class for Comet's columnar vectors (extends Spark's `ColumnVector`). + +```java +// Constructor +public CometVector(DataType type, boolean useDecimal128) + +// Abstract methods that subclasses must implement +public abstract int numValues() +public abstract ValueVector getValueVector() +public abstract CometVector slice(int offset, int length) +public abstract void setNumNulls(int numNulls) +public abstract void setNumValues(int numValues) + +// Inherited from Spark ColumnVector - commonly overridden +public abstract void close() +public abstract boolean hasNull() +public abstract int numNulls() +public abstract boolean isNullAt(int rowId) +public abstract boolean getBoolean(int rowId) +// ... other type-specific getters +``` + +## Package: `org.apache.comet.shaded.arrow.memory` + +### RootAllocator + +Arrow memory allocator (shaded to avoid conflicts). + +```java +// Constructor used by Iceberg +public RootAllocator() +``` + +## Package: `org.apache.comet.shaded.arrow.vector` + +### ValueVector + +Arrow's base vector interface (shaded). Used as return type in `CometVector.getValueVector()`. + +## How Iceberg Uses These APIs + +### Parquet File Reading Flow + +1. Iceberg creates a `WrappedInputFile` from its `InputFile` +2. Creates `ReadOptions` via builder pattern +3. Instantiates `FileReader` with the wrapped input file +4. Converts Parquet `ColumnDescriptor`s to `ParquetColumnSpec`s using `CometTypeUtils` +5. Calls `setRequestedSchemaFromSpecs()` to specify which columns to read +6. Iterates through row groups via `readNextRowGroup()` and `skipNextRowGroup()` + +### Column Reading Flow + +1. Creates `CometSchemaImporter` with a `RootAllocator` +2. Uses `Utils.getColumnReader()` to create appropriate column readers +3. Calls `reset()` and `setPageReader()` for each row group Review Comment: Which `reset()` method ? Maybe: ```suggestion 3. Calls `Native.resetBatch()` and `setPageReader()` for each row group ``` ?! ########## common/src/main/java/org/apache/comet/parquet/BatchReader.java: ########## @@ -189,6 +191,7 @@ public BatchReader( * @deprecated since 0.10.0, will be removed in 0.11.0. Review Comment: Maybe it should be undeprecated ?! Currently is is both deprecated and important (used by Iceberg) ########## iceberg-public-api/src/test/java/org/apache/comet/iceberg/api/AbstractApiTest.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg.api; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; + +import org.junit.After; +import org.junit.Before; + +import org.apache.comet.IcebergApi; + +/** + * Base class for Iceberg API tests. Provides common utilities for testing annotated API elements. + */ +public abstract class AbstractApiTest { + + protected Path tempDir; + + @Before + public void setUp() throws IOException { + tempDir = Files.createTempDirectory("iceberg-api-test"); + } + + @After + public void tearDown() throws IOException { + if (tempDir != null && Files.exists(tempDir)) { + Files.walk(tempDir).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); Review Comment: ```suggestion try (Stream<Path> stream : Files.walk(tempDir)) { stream.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); } ``` https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/nio/file/Files.html#walk(java.nio.file.Path,java.nio.file.FileVisitOption...) says: ``` The returned stream contains references to one or more open directories. The directories are closed by closing the stream. ``` ########## common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java: ########## Review Comment: It seems all other getters in this class are used/annotated but this one is not. Just checking that it is not missed by mistake. ########## iceberg-public-api/src/test/java/org/apache/comet/iceberg/api/AbstractApiTest.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg.api; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; + +import org.junit.After; +import org.junit.Before; + +import org.apache.comet.IcebergApi; + +/** + * Base class for Iceberg API tests. Provides common utilities for testing annotated API elements. + */ +public abstract class AbstractApiTest { + + protected Path tempDir; + + @Before + public void setUp() throws IOException { + tempDir = Files.createTempDirectory("iceberg-api-test"); + } + + @After + public void tearDown() throws IOException { + if (tempDir != null && Files.exists(tempDir)) { + Files.walk(tempDir).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); + } + } + + /** Checks if a class has the @IcebergApi annotation. */ + protected static boolean hasIcebergApiAnnotation(Class<?> clazz) { + return clazz.isAnnotationPresent(IcebergApi.class); + } + + /** Checks if a method has the @IcebergApi annotation. */ + protected static boolean hasIcebergApiAnnotation(Method method) { + return method.isAnnotationPresent(IcebergApi.class); + } + + /** Checks if a constructor has the @IcebergApi annotation. */ + protected static boolean hasIcebergApiAnnotation(Constructor<?> constructor) { + return constructor.isAnnotationPresent(IcebergApi.class); + } + + /** Checks if a field has the @IcebergApi annotation. */ + protected static boolean hasIcebergApiAnnotation(Field field) { + return field.isAnnotationPresent(IcebergApi.class); + } + + /** Checks if a class is public. */ + protected static boolean isPublic(Class<?> clazz) { + return Modifier.isPublic(clazz.getModifiers()); + } + + /** Checks if a method is public. */ + protected static boolean isPublic(Method method) { + return Modifier.isPublic(method.getModifiers()); + } + + /** Checks if a constructor is public. */ + protected static boolean isPublic(Constructor<?> constructor) { + return Modifier.isPublic(constructor.getModifiers()); + } + + /** Checks if a field is public or protected. */ + protected static boolean isAccessible(Field field) { + int modifiers = field.getModifiers(); + return Modifier.isPublic(modifiers) || Modifier.isProtected(modifiers); + } + + /** Checks if native library is available. */ + protected static boolean isNativeLibraryAvailable() { + try { + Class.forName("org.apache.comet.NativeBase"); Review Comment: This does not guarantee that the native library is available and loadable. ########## iceberg-public-api/src/test/java/org/apache/comet/iceberg/api/parquet/AbstractColumnReaderApiTest.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg.api.parquet; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; + +import org.junit.Test; + +import org.apache.comet.iceberg.api.AbstractApiTest; +import org.apache.comet.parquet.AbstractColumnReader; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the AbstractColumnReader public API. */ +public class AbstractColumnReaderApiTest extends AbstractApiTest { + + @Test + public void testAbstractColumnReaderHasIcebergApiAnnotation() { + assertThat(hasIcebergApiAnnotation(AbstractColumnReader.class)).isTrue(); + } + + @Test + public void testAbstractColumnReaderIsPublic() { + assertThat(isPublic(AbstractColumnReader.class)).isTrue(); + } + + @Test + public void testAbstractColumnReaderIsAbstract() { + assertThat(Modifier.isAbstract(AbstractColumnReader.class.getModifiers())).isTrue(); + } + + @Test + public void testImplementsAutoCloseable() { + assertThat(AutoCloseable.class.isAssignableFrom(AbstractColumnReader.class)).isTrue(); + } + + @Test + public void testSetBatchSizeMethodExists() throws NoSuchMethodException { + Method method = AbstractColumnReader.class.getMethod("setBatchSize", int.class); + assertThat(method).isNotNull(); + assertThat(hasIcebergApiAnnotation(method)).isTrue(); + } + + @Test + public void testCloseMethodExists() throws NoSuchMethodException { + Method method = AbstractColumnReader.class.getMethod("close"); + assertThat(method).isNotNull(); + assertThat(hasIcebergApiAnnotation(method)).isTrue(); + } + + @Test + public void testReadBatchMethodExists() throws NoSuchMethodException { + Method method = AbstractColumnReader.class.getMethod("readBatch", int.class); + assertThat(method).isNotNull(); + assertThat(Modifier.isAbstract(method.getModifiers())).isTrue(); + } + + @Test + public void testCurrentBatchMethodExists() throws NoSuchMethodException { + Method method = AbstractColumnReader.class.getMethod("currentBatch"); + assertThat(method).isNotNull(); Review Comment: ```suggestion ``` ########## iceberg-public-api/src/test/java/org/apache/comet/iceberg/api/parquet/BatchReaderApiTest.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg.api.parquet; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; + +import org.junit.Test; + +import org.apache.spark.sql.types.StructType; + +import org.apache.comet.iceberg.api.AbstractApiTest; +import org.apache.comet.parquet.AbstractColumnReader; +import org.apache.comet.parquet.BatchReader; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the BatchReader public API. */ +public class BatchReaderApiTest extends AbstractApiTest { + + @Test + public void testBatchReaderHasIcebergApiAnnotation() { + assertThat(hasIcebergApiAnnotation(BatchReader.class)).isTrue(); + } + + @Test + public void testBatchReaderIsPublic() { + assertThat(isPublic(BatchReader.class)).isTrue(); + } + + @Test + public void testConstructorWithColumnReadersExists() throws NoSuchMethodException { + Constructor<?> constructor = BatchReader.class.getConstructor(AbstractColumnReader[].class); + assertThat(constructor).isNotNull(); Review Comment: getConstructor() either throws or succeeds ```suggestion ``` ########## common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java: ########## Review Comment: ```suggestion Method targetMethod = wrapped.getClass().getDeclaredMethod("getLength"); ``` ########## iceberg-public-api/src/test/java/org/apache/comet/iceberg/api/parquet/BatchReaderApiTest.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg.api.parquet; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; + +import org.junit.Test; + +import org.apache.spark.sql.types.StructType; + +import org.apache.comet.iceberg.api.AbstractApiTest; +import org.apache.comet.parquet.AbstractColumnReader; +import org.apache.comet.parquet.BatchReader; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the BatchReader public API. */ +public class BatchReaderApiTest extends AbstractApiTest { + + @Test + public void testBatchReaderHasIcebergApiAnnotation() { + assertThat(hasIcebergApiAnnotation(BatchReader.class)).isTrue(); + } + + @Test + public void testBatchReaderIsPublic() { + assertThat(isPublic(BatchReader.class)).isTrue(); + } + + @Test + public void testConstructorWithColumnReadersExists() throws NoSuchMethodException { + Constructor<?> constructor = BatchReader.class.getConstructor(AbstractColumnReader[].class); + assertThat(constructor).isNotNull(); + assertThat(hasIcebergApiAnnotation(constructor)).isTrue(); + } + + @Test + public void testSetSparkSchemaMethodExists() throws NoSuchMethodException { + Method method = BatchReader.class.getMethod("setSparkSchema", StructType.class); + assertThat(method).isNotNull(); + assertThat(hasIcebergApiAnnotation(method)).isTrue(); + } + + @Test + public void testGetColumnReadersMethodExists() throws NoSuchMethodException { + Method method = BatchReader.class.getMethod("getColumnReaders"); + assertThat(method).isNotNull(); + assertThat(hasIcebergApiAnnotation(method)).isTrue(); + assertThat(method.getReturnType()).isEqualTo(AbstractColumnReader[].class); + } + + @Test + public void testNextBatchWithSizeMethodExists() throws NoSuchMethodException { + Method method = BatchReader.class.getMethod("nextBatch", int.class); + assertThat(method).isNotNull(); Review Comment: ```suggestion ``` ########## iceberg-public-api/src/test/java/org/apache/comet/iceberg/api/parquet/AbstractColumnReaderApiTest.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg.api.parquet; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; + +import org.junit.Test; + +import org.apache.comet.iceberg.api.AbstractApiTest; +import org.apache.comet.parquet.AbstractColumnReader; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the AbstractColumnReader public API. */ +public class AbstractColumnReaderApiTest extends AbstractApiTest { + + @Test + public void testAbstractColumnReaderHasIcebergApiAnnotation() { + assertThat(hasIcebergApiAnnotation(AbstractColumnReader.class)).isTrue(); + } + + @Test + public void testAbstractColumnReaderIsPublic() { + assertThat(isPublic(AbstractColumnReader.class)).isTrue(); + } + + @Test + public void testAbstractColumnReaderIsAbstract() { + assertThat(Modifier.isAbstract(AbstractColumnReader.class.getModifiers())).isTrue(); + } + + @Test + public void testImplementsAutoCloseable() { + assertThat(AutoCloseable.class.isAssignableFrom(AbstractColumnReader.class)).isTrue(); + } + + @Test + public void testSetBatchSizeMethodExists() throws NoSuchMethodException { + Method method = AbstractColumnReader.class.getMethod("setBatchSize", int.class); + assertThat(method).isNotNull(); + assertThat(hasIcebergApiAnnotation(method)).isTrue(); + } + + @Test + public void testCloseMethodExists() throws NoSuchMethodException { + Method method = AbstractColumnReader.class.getMethod("close"); + assertThat(method).isNotNull(); + assertThat(hasIcebergApiAnnotation(method)).isTrue(); + } + + @Test + public void testReadBatchMethodExists() throws NoSuchMethodException { + Method method = AbstractColumnReader.class.getMethod("readBatch", int.class); + assertThat(method).isNotNull(); Review Comment: ```suggestion ``` ########## iceberg-public-api/src/test/java/org/apache/comet/iceberg/api/parquet/BatchReaderApiTest.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg.api.parquet; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; + +import org.junit.Test; + +import org.apache.spark.sql.types.StructType; + +import org.apache.comet.iceberg.api.AbstractApiTest; +import org.apache.comet.parquet.AbstractColumnReader; +import org.apache.comet.parquet.BatchReader; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the BatchReader public API. */ +public class BatchReaderApiTest extends AbstractApiTest { + + @Test + public void testBatchReaderHasIcebergApiAnnotation() { + assertThat(hasIcebergApiAnnotation(BatchReader.class)).isTrue(); + } + + @Test + public void testBatchReaderIsPublic() { + assertThat(isPublic(BatchReader.class)).isTrue(); + } + + @Test + public void testConstructorWithColumnReadersExists() throws NoSuchMethodException { + Constructor<?> constructor = BatchReader.class.getConstructor(AbstractColumnReader[].class); + assertThat(constructor).isNotNull(); + assertThat(hasIcebergApiAnnotation(constructor)).isTrue(); + } + + @Test + public void testSetSparkSchemaMethodExists() throws NoSuchMethodException { + Method method = BatchReader.class.getMethod("setSparkSchema", StructType.class); + assertThat(method).isNotNull(); + assertThat(hasIcebergApiAnnotation(method)).isTrue(); + } + + @Test + public void testGetColumnReadersMethodExists() throws NoSuchMethodException { + Method method = BatchReader.class.getMethod("getColumnReaders"); + assertThat(method).isNotNull(); Review Comment: ```suggestion ``` ########## iceberg-public-api/src/test/java/org/apache/comet/iceberg/api/parquet/AbstractColumnReaderApiTest.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg.api.parquet; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; + +import org.junit.Test; + +import org.apache.comet.iceberg.api.AbstractApiTest; +import org.apache.comet.parquet.AbstractColumnReader; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the AbstractColumnReader public API. */ +public class AbstractColumnReaderApiTest extends AbstractApiTest { + + @Test + public void testAbstractColumnReaderHasIcebergApiAnnotation() { + assertThat(hasIcebergApiAnnotation(AbstractColumnReader.class)).isTrue(); + } + + @Test + public void testAbstractColumnReaderIsPublic() { + assertThat(isPublic(AbstractColumnReader.class)).isTrue(); + } + + @Test + public void testAbstractColumnReaderIsAbstract() { + assertThat(Modifier.isAbstract(AbstractColumnReader.class.getModifiers())).isTrue(); + } + + @Test + public void testImplementsAutoCloseable() { + assertThat(AutoCloseable.class.isAssignableFrom(AbstractColumnReader.class)).isTrue(); + } + + @Test + public void testSetBatchSizeMethodExists() throws NoSuchMethodException { + Method method = AbstractColumnReader.class.getMethod("setBatchSize", int.class); + assertThat(method).isNotNull(); Review Comment: This is always true. `getMethod()` will throw if there is no such method ```suggestion ``` ########## iceberg-public-api/src/test/java/org/apache/comet/iceberg/api/parquet/BatchReaderApiTest.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg.api.parquet; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; + +import org.junit.Test; + +import org.apache.spark.sql.types.StructType; + +import org.apache.comet.iceberg.api.AbstractApiTest; +import org.apache.comet.parquet.AbstractColumnReader; +import org.apache.comet.parquet.BatchReader; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the BatchReader public API. */ +public class BatchReaderApiTest extends AbstractApiTest { + + @Test + public void testBatchReaderHasIcebergApiAnnotation() { + assertThat(hasIcebergApiAnnotation(BatchReader.class)).isTrue(); + } + + @Test + public void testBatchReaderIsPublic() { + assertThat(isPublic(BatchReader.class)).isTrue(); + } + + @Test + public void testConstructorWithColumnReadersExists() throws NoSuchMethodException { + Constructor<?> constructor = BatchReader.class.getConstructor(AbstractColumnReader[].class); + assertThat(constructor).isNotNull(); + assertThat(hasIcebergApiAnnotation(constructor)).isTrue(); + } + + @Test + public void testSetSparkSchemaMethodExists() throws NoSuchMethodException { + Method method = BatchReader.class.getMethod("setSparkSchema", StructType.class); + assertThat(method).isNotNull(); Review Comment: ```suggestion ``` ########## docs/source/contributor-guide/iceberg_public_api.md: ########## @@ -0,0 +1,342 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Public API for Apache Iceberg Integration + +This document describes the Comet classes and methods that form the public API used by +[Apache Iceberg](https://iceberg.apache.org/). These APIs enable Iceberg to leverage Comet's +native Parquet reader for vectorized reads in Spark. + +**Important**: Changes to these APIs may break Iceberg's Comet integration. Contributors should +exercise caution when modifying these classes and consider backward compatibility. + +All classes and methods documented here are marked with the `@IcebergApi` annotation +(`org.apache.comet.IcebergApi`) to make them easily identifiable in the source code. + +## Overview + +Iceberg uses Comet's Parquet reading infrastructure to accelerate table scans. The integration +uses two approaches: + +1. **Hybrid Reader**: Native Parquet decoding with JVM-based I/O (requires building Iceberg + from source with Comet patches) +2. **Native Reader**: Fully-native Iceberg scans using iceberg-rust + +This document focuses on the Hybrid Reader API, which is used when Iceberg is configured with +`spark.sql.iceberg.parquet.reader-type=COMET`. + +## Package: `org.apache.comet.parquet` + +### FileReader + +Main class for reading Parquet files with native decoding. + +```java +// Constructor +public FileReader( + WrappedInputFile inputFile, + ReadOptions options, + Map<String, String> properties, + Long start, + Long length, + byte[] fileEncryptionKey, + byte[] fileAADPrefix +) throws IOException + +// Methods used by Iceberg +public void setRequestedSchemaFromSpecs(List<ParquetColumnSpec> specs) +public RowGroupReader readNextRowGroup() throws IOException +public void skipNextRowGroup() +public void close() throws IOException +``` + +### RowGroupReader + +Provides access to row group data. + +```java +// Methods used by Iceberg +public long getRowCount() +``` + +### ReadOptions + +Configuration for Parquet read operations. + +```java +// Builder pattern +public static Builder builder(Configuration conf) + +public class Builder { + public ReadOptions build() +} +``` + +### WrappedInputFile + +Wrapper that adapts Iceberg's `InputFile` interface to Comet's file reading infrastructure. + +```java +// Constructor +public WrappedInputFile(org.apache.iceberg.io.InputFile inputFile) +``` + +### ParquetColumnSpec + +Specification describing a Parquet column's schema information. + +```java +// Constructor +public ParquetColumnSpec( + int fieldId, + String[] path, + String physicalType, + int typeLength, + boolean isRepeated, + int maxDefinitionLevel, + int maxRepetitionLevel, + String logicalTypeName, + Map<String, String> logicalTypeParams +) + +// Getters used by Iceberg +public int getFieldId() +public String[] getPath() +public String getPhysicalType() +public int getTypeLength() +public int getMaxDefinitionLevel() +public int getMaxRepetitionLevel() +public String getLogicalTypeName() +public Map<String, String> getLogicalTypeParams() +``` + +### AbstractColumnReader + +Base class for column readers. + +```java +// Protected field accessed by Iceberg subclasses +protected long nativeHandle + +// Methods used by Iceberg +public void setBatchSize(int batchSize) +public void close() +``` + +### ColumnReader + +Column reader for regular Parquet columns (extends `AbstractColumnReader`). + +```java +// Methods used by Iceberg +public void setPageReader(PageReader pageReader) throws IOException +``` + +### BatchReader + +Coordinates reading batches across multiple column readers. + +```java +// Constructor +public BatchReader(AbstractColumnReader[] columnReaders) + +// Methods used by Iceberg +public void setSparkSchema(StructType schema) +public AbstractColumnReader[] getColumnReaders() +public void nextBatch(int batchSize) +``` + +### MetadataColumnReader + +Reader for metadata columns (used for Iceberg's delete and position columns). + +```java +// Constructor +public MetadataColumnReader( + DataType sparkType, + ColumnDescriptor descriptor, + boolean useDecimal128, + boolean isConstant +) + +// Methods used by Iceberg +public void readBatch(int total) +public CometVector currentBatch() + +// Protected field accessed by subclasses +protected long nativeHandle +``` + +### ConstantColumnReader + +Reader for columns with constant/default values (extends `MetadataColumnReader`). + +```java +// Constructor +public ConstantColumnReader( + DataType sparkType, + ColumnDescriptor descriptor, + Object value, + boolean useDecimal128 +) +``` + +### Native + +JNI interface for native operations. + +```java +// Static methods used by Iceberg +public static void resetBatch(long nativeHandle) +public static void setIsDeleted(long nativeHandle, boolean[] isDeleted) +public static void setPosition(long nativeHandle, long position, int total) +``` + +### TypeUtil + +Utilities for Parquet type conversions. + +```java +// Methods used by Iceberg +public static ColumnDescriptor convertToParquet(StructField sparkField) +``` + +### Utils + +General utility methods. + +```java +// Methods used by Iceberg +public static AbstractColumnReader getColumnReader( + DataType sparkType, + ColumnDescriptor descriptor, + CometSchemaImporter importer, + int batchSize, + boolean useDecimal128, + boolean isConstant +) +``` + +## Package: `org.apache.comet` + +### CometSchemaImporter + +Imports and converts schemas between Arrow and Spark formats. + +```java +// Constructor +public CometSchemaImporter(RootAllocator allocator) + +// Methods used by Iceberg (inherited from AbstractCometSchemaImporter) +public void close() +``` + +## Package: `org.apache.arrow.c` + +### AbstractCometSchemaImporter + +Base class for `CometSchemaImporter`. + +```java +// Methods used by Iceberg +public void close() +``` + +## Package: `org.apache.comet.vector` + +### CometVector + +Base class for Comet's columnar vectors (extends Spark's `ColumnVector`). + +```java +// Constructor +public CometVector(DataType type, boolean useDecimal128) + +// Abstract methods that subclasses must implement +public abstract int numValues() +public abstract ValueVector getValueVector() +public abstract CometVector slice(int offset, int length) +public abstract void setNumNulls(int numNulls) +public abstract void setNumValues(int numValues) + +// Inherited from Spark ColumnVector - commonly overridden +public abstract void close() +public abstract boolean hasNull() +public abstract int numNulls() +public abstract boolean isNullAt(int rowId) +public abstract boolean getBoolean(int rowId) +// ... other type-specific getters +``` + +## Package: `org.apache.comet.shaded.arrow.memory` + +### RootAllocator + +Arrow memory allocator (shaded to avoid conflicts). + +```java +// Constructor used by Iceberg +public RootAllocator() +``` + +## Package: `org.apache.comet.shaded.arrow.vector` + +### ValueVector + +Arrow's base vector interface (shaded). Used as return type in `CometVector.getValueVector()`. + +## How Iceberg Uses These APIs + +### Parquet File Reading Flow + +1. Iceberg creates a `WrappedInputFile` from its `InputFile` +2. Creates `ReadOptions` via builder pattern +3. Instantiates `FileReader` with the wrapped input file +4. Converts Parquet `ColumnDescriptor`s to `ParquetColumnSpec`s using `CometTypeUtils` Review Comment: ```suggestion 4. Converts Parquet `ColumnDescriptor`s to `ParquetColumnSpec`s using `TypeUtil` ``` ? ########## common/src/main/java/org/apache/comet/parquet/WrappedInputFile.java: ########## Review Comment: ```suggestion Method targetMethod = wrapped.getClass().getDeclaredMethod("newStream"); ``` ########## iceberg-public-api/src/test/java/org/apache/comet/iceberg/api/parquet/AbstractColumnReaderApiTest.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg.api.parquet; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; + +import org.junit.Test; + +import org.apache.comet.iceberg.api.AbstractApiTest; +import org.apache.comet.parquet.AbstractColumnReader; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the AbstractColumnReader public API. */ +public class AbstractColumnReaderApiTest extends AbstractApiTest { + + @Test + public void testAbstractColumnReaderHasIcebergApiAnnotation() { + assertThat(hasIcebergApiAnnotation(AbstractColumnReader.class)).isTrue(); + } + + @Test + public void testAbstractColumnReaderIsPublic() { + assertThat(isPublic(AbstractColumnReader.class)).isTrue(); + } + + @Test + public void testAbstractColumnReaderIsAbstract() { + assertThat(Modifier.isAbstract(AbstractColumnReader.class.getModifiers())).isTrue(); + } + + @Test + public void testImplementsAutoCloseable() { + assertThat(AutoCloseable.class.isAssignableFrom(AbstractColumnReader.class)).isTrue(); + } + + @Test + public void testSetBatchSizeMethodExists() throws NoSuchMethodException { + Method method = AbstractColumnReader.class.getMethod("setBatchSize", int.class); + assertThat(method).isNotNull(); + assertThat(hasIcebergApiAnnotation(method)).isTrue(); + } + + @Test + public void testCloseMethodExists() throws NoSuchMethodException { + Method method = AbstractColumnReader.class.getMethod("close"); + assertThat(method).isNotNull(); + assertThat(hasIcebergApiAnnotation(method)).isTrue(); + } + + @Test + public void testReadBatchMethodExists() throws NoSuchMethodException { + Method method = AbstractColumnReader.class.getMethod("readBatch", int.class); + assertThat(method).isNotNull(); + assertThat(Modifier.isAbstract(method.getModifiers())).isTrue(); + } + + @Test + public void testCurrentBatchMethodExists() throws NoSuchMethodException { + Method method = AbstractColumnReader.class.getMethod("currentBatch"); + assertThat(method).isNotNull(); + assertThat(Modifier.isAbstract(method.getModifiers())).isTrue(); + assertThat(method.getReturnType().getSimpleName()).isEqualTo("CometVector"); + } + + @Test + public void testNativeHandleFieldExists() throws NoSuchFieldException { + Field field = AbstractColumnReader.class.getDeclaredField("nativeHandle"); + assertThat(field).isNotNull(); Review Comment: ```suggestion ``` ########## iceberg-public-api/src/test/java/org/apache/comet/iceberg/api/parquet/BatchReaderApiTest.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg.api.parquet; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; + +import org.junit.Test; + +import org.apache.spark.sql.types.StructType; + +import org.apache.comet.iceberg.api.AbstractApiTest; +import org.apache.comet.parquet.AbstractColumnReader; +import org.apache.comet.parquet.BatchReader; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the BatchReader public API. */ +public class BatchReaderApiTest extends AbstractApiTest { + + @Test + public void testBatchReaderHasIcebergApiAnnotation() { + assertThat(hasIcebergApiAnnotation(BatchReader.class)).isTrue(); + } + + @Test + public void testBatchReaderIsPublic() { + assertThat(isPublic(BatchReader.class)).isTrue(); + } + + @Test + public void testConstructorWithColumnReadersExists() throws NoSuchMethodException { + Constructor<?> constructor = BatchReader.class.getConstructor(AbstractColumnReader[].class); + assertThat(constructor).isNotNull(); + assertThat(hasIcebergApiAnnotation(constructor)).isTrue(); + } + + @Test + public void testSetSparkSchemaMethodExists() throws NoSuchMethodException { + Method method = BatchReader.class.getMethod("setSparkSchema", StructType.class); + assertThat(method).isNotNull(); + assertThat(hasIcebergApiAnnotation(method)).isTrue(); + } + + @Test + public void testGetColumnReadersMethodExists() throws NoSuchMethodException { + Method method = BatchReader.class.getMethod("getColumnReaders"); + assertThat(method).isNotNull(); + assertThat(hasIcebergApiAnnotation(method)).isTrue(); + assertThat(method.getReturnType()).isEqualTo(AbstractColumnReader[].class); + } + + @Test + public void testNextBatchWithSizeMethodExists() throws NoSuchMethodException { + Method method = BatchReader.class.getMethod("nextBatch", int.class); + assertThat(method).isNotNull(); + assertThat(hasIcebergApiAnnotation(method)).isTrue(); + assertThat(method.getReturnType()).isEqualTo(boolean.class); + } + + @Test + public void testCurrentBatchMethodExists() throws NoSuchMethodException { + Method method = BatchReader.class.getMethod("currentBatch"); + assertThat(method).isNotNull(); Review Comment: ```suggestion ``` ########## docs/source/contributor-guide/iceberg_public_api.md: ########## @@ -0,0 +1,342 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Public API for Apache Iceberg Integration + +This document describes the Comet classes and methods that form the public API used by +[Apache Iceberg](https://iceberg.apache.org/). These APIs enable Iceberg to leverage Comet's +native Parquet reader for vectorized reads in Spark. + +**Important**: Changes to these APIs may break Iceberg's Comet integration. Contributors should +exercise caution when modifying these classes and consider backward compatibility. + +All classes and methods documented here are marked with the `@IcebergApi` annotation +(`org.apache.comet.IcebergApi`) to make them easily identifiable in the source code. + +## Overview + +Iceberg uses Comet's Parquet reading infrastructure to accelerate table scans. The integration +uses two approaches: + +1. **Hybrid Reader**: Native Parquet decoding with JVM-based I/O (requires building Iceberg + from source with Comet patches) +2. **Native Reader**: Fully-native Iceberg scans using iceberg-rust + +This document focuses on the Hybrid Reader API, which is used when Iceberg is configured with +`spark.sql.iceberg.parquet.reader-type=COMET`. + +## Package: `org.apache.comet.parquet` + +### FileReader + +Main class for reading Parquet files with native decoding. + +```java +// Constructor +public FileReader( + WrappedInputFile inputFile, + ReadOptions options, + Map<String, String> properties, + Long start, + Long length, + byte[] fileEncryptionKey, + byte[] fileAADPrefix +) throws IOException + +// Methods used by Iceberg +public void setRequestedSchemaFromSpecs(List<ParquetColumnSpec> specs) +public RowGroupReader readNextRowGroup() throws IOException +public void skipNextRowGroup() +public void close() throws IOException +``` + +### RowGroupReader + +Provides access to row group data. + +```java +// Methods used by Iceberg +public long getRowCount() +``` + +### ReadOptions + +Configuration for Parquet read operations. + +```java +// Builder pattern +public static Builder builder(Configuration conf) + +public class Builder { + public ReadOptions build() +} +``` + +### WrappedInputFile + +Wrapper that adapts Iceberg's `InputFile` interface to Comet's file reading infrastructure. + +```java +// Constructor +public WrappedInputFile(org.apache.iceberg.io.InputFile inputFile) +``` + +### ParquetColumnSpec + +Specification describing a Parquet column's schema information. + +```java +// Constructor +public ParquetColumnSpec( + int fieldId, + String[] path, + String physicalType, + int typeLength, + boolean isRepeated, + int maxDefinitionLevel, + int maxRepetitionLevel, + String logicalTypeName, + Map<String, String> logicalTypeParams +) + +// Getters used by Iceberg +public int getFieldId() +public String[] getPath() +public String getPhysicalType() +public int getTypeLength() +public int getMaxDefinitionLevel() +public int getMaxRepetitionLevel() +public String getLogicalTypeName() +public Map<String, String> getLogicalTypeParams() +``` + +### AbstractColumnReader + +Base class for column readers. + +```java +// Protected field accessed by Iceberg subclasses +protected long nativeHandle + +// Methods used by Iceberg +public void setBatchSize(int batchSize) +public void close() +``` + +### ColumnReader + +Column reader for regular Parquet columns (extends `AbstractColumnReader`). + +```java +// Methods used by Iceberg +public void setPageReader(PageReader pageReader) throws IOException +``` + +### BatchReader + +Coordinates reading batches across multiple column readers. + +```java +// Constructor +public BatchReader(AbstractColumnReader[] columnReaders) + +// Methods used by Iceberg +public void setSparkSchema(StructType schema) +public AbstractColumnReader[] getColumnReaders() +public void nextBatch(int batchSize) +``` + +### MetadataColumnReader + +Reader for metadata columns (used for Iceberg's delete and position columns). + +```java +// Constructor +public MetadataColumnReader( + DataType sparkType, + ColumnDescriptor descriptor, + boolean useDecimal128, + boolean isConstant +) + +// Methods used by Iceberg +public void readBatch(int total) +public CometVector currentBatch() + +// Protected field accessed by subclasses +protected long nativeHandle +``` + +### ConstantColumnReader + +Reader for columns with constant/default values (extends `MetadataColumnReader`). + +```java +// Constructor +public ConstantColumnReader( + DataType sparkType, + ColumnDescriptor descriptor, + Object value, + boolean useDecimal128 +) +``` + +### Native + +JNI interface for native operations. + +```java +// Static methods used by Iceberg +public static void resetBatch(long nativeHandle) +public static void setIsDeleted(long nativeHandle, boolean[] isDeleted) +public static void setPosition(long nativeHandle, long position, int total) +``` + +### TypeUtil + +Utilities for Parquet type conversions. + +```java +// Methods used by Iceberg +public static ColumnDescriptor convertToParquet(StructField sparkField) +``` + +### Utils + +General utility methods. + +```java +// Methods used by Iceberg +public static AbstractColumnReader getColumnReader( + DataType sparkType, + ColumnDescriptor descriptor, + CometSchemaImporter importer, + int batchSize, + boolean useDecimal128, + boolean isConstant Review Comment: ```suggestion boolean useLazyMaterialization, boolean useLegacyTimestamp ``` ########## iceberg-public-api/src/test/java/org/apache/comet/iceberg/api/parquet/BatchReaderApiTest.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg.api.parquet; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; + +import org.junit.Test; + +import org.apache.spark.sql.types.StructType; + +import org.apache.comet.iceberg.api.AbstractApiTest; +import org.apache.comet.parquet.AbstractColumnReader; +import org.apache.comet.parquet.BatchReader; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the BatchReader public API. */ +public class BatchReaderApiTest extends AbstractApiTest { + + @Test + public void testBatchReaderHasIcebergApiAnnotation() { + assertThat(hasIcebergApiAnnotation(BatchReader.class)).isTrue(); + } + + @Test + public void testBatchReaderIsPublic() { + assertThat(isPublic(BatchReader.class)).isTrue(); + } + + @Test + public void testConstructorWithColumnReadersExists() throws NoSuchMethodException { + Constructor<?> constructor = BatchReader.class.getConstructor(AbstractColumnReader[].class); + assertThat(constructor).isNotNull(); + assertThat(hasIcebergApiAnnotation(constructor)).isTrue(); + } + + @Test + public void testSetSparkSchemaMethodExists() throws NoSuchMethodException { + Method method = BatchReader.class.getMethod("setSparkSchema", StructType.class); + assertThat(method).isNotNull(); + assertThat(hasIcebergApiAnnotation(method)).isTrue(); + } + + @Test + public void testGetColumnReadersMethodExists() throws NoSuchMethodException { + Method method = BatchReader.class.getMethod("getColumnReaders"); + assertThat(method).isNotNull(); + assertThat(hasIcebergApiAnnotation(method)).isTrue(); + assertThat(method.getReturnType()).isEqualTo(AbstractColumnReader[].class); + } + + @Test + public void testNextBatchWithSizeMethodExists() throws NoSuchMethodException { + Method method = BatchReader.class.getMethod("nextBatch", int.class); + assertThat(method).isNotNull(); + assertThat(hasIcebergApiAnnotation(method)).isTrue(); + assertThat(method.getReturnType()).isEqualTo(boolean.class); + } + + @Test + public void testCurrentBatchMethodExists() throws NoSuchMethodException { + Method method = BatchReader.class.getMethod("currentBatch"); + assertThat(method).isNotNull(); + assertThat(method.getReturnType().getSimpleName()).isEqualTo("ColumnarBatch"); + } + + @Test + public void testCloseMethodExists() throws NoSuchMethodException { Review Comment: Is the close method important (used by Iceberg) ? Maybe add `@IcebergApi` to it too ? ########## iceberg-public-api/src/test/java/org/apache/comet/iceberg/api/parquet/BatchReaderApiTest.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg.api.parquet; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; + +import org.junit.Test; + +import org.apache.spark.sql.types.StructType; + +import org.apache.comet.iceberg.api.AbstractApiTest; +import org.apache.comet.parquet.AbstractColumnReader; +import org.apache.comet.parquet.BatchReader; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the BatchReader public API. */ +public class BatchReaderApiTest extends AbstractApiTest { + + @Test + public void testBatchReaderHasIcebergApiAnnotation() { + assertThat(hasIcebergApiAnnotation(BatchReader.class)).isTrue(); + } + + @Test + public void testBatchReaderIsPublic() { + assertThat(isPublic(BatchReader.class)).isTrue(); + } + + @Test + public void testConstructorWithColumnReadersExists() throws NoSuchMethodException { + Constructor<?> constructor = BatchReader.class.getConstructor(AbstractColumnReader[].class); + assertThat(constructor).isNotNull(); + assertThat(hasIcebergApiAnnotation(constructor)).isTrue(); + } + + @Test + public void testSetSparkSchemaMethodExists() throws NoSuchMethodException { + Method method = BatchReader.class.getMethod("setSparkSchema", StructType.class); + assertThat(method).isNotNull(); + assertThat(hasIcebergApiAnnotation(method)).isTrue(); + } + + @Test + public void testGetColumnReadersMethodExists() throws NoSuchMethodException { + Method method = BatchReader.class.getMethod("getColumnReaders"); + assertThat(method).isNotNull(); + assertThat(hasIcebergApiAnnotation(method)).isTrue(); + assertThat(method.getReturnType()).isEqualTo(AbstractColumnReader[].class); + } + + @Test + public void testNextBatchWithSizeMethodExists() throws NoSuchMethodException { + Method method = BatchReader.class.getMethod("nextBatch", int.class); + assertThat(method).isNotNull(); + assertThat(hasIcebergApiAnnotation(method)).isTrue(); + assertThat(method.getReturnType()).isEqualTo(boolean.class); + } + + @Test + public void testCurrentBatchMethodExists() throws NoSuchMethodException { + Method method = BatchReader.class.getMethod("currentBatch"); + assertThat(method).isNotNull(); + assertThat(method.getReturnType().getSimpleName()).isEqualTo("ColumnarBatch"); + } + + @Test + public void testCloseMethodExists() throws NoSuchMethodException { + Method method = BatchReader.class.getMethod("close"); + assertThat(method).isNotNull(); Review Comment: ```suggestion ``` ########## docs/source/contributor-guide/iceberg_public_api.md: ########## @@ -0,0 +1,342 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Public API for Apache Iceberg Integration + +This document describes the Comet classes and methods that form the public API used by +[Apache Iceberg](https://iceberg.apache.org/). These APIs enable Iceberg to leverage Comet's +native Parquet reader for vectorized reads in Spark. + +**Important**: Changes to these APIs may break Iceberg's Comet integration. Contributors should +exercise caution when modifying these classes and consider backward compatibility. + +All classes and methods documented here are marked with the `@IcebergApi` annotation +(`org.apache.comet.IcebergApi`) to make them easily identifiable in the source code. + +## Overview + +Iceberg uses Comet's Parquet reading infrastructure to accelerate table scans. The integration +uses two approaches: + +1. **Hybrid Reader**: Native Parquet decoding with JVM-based I/O (requires building Iceberg + from source with Comet patches) +2. **Native Reader**: Fully-native Iceberg scans using iceberg-rust + +This document focuses on the Hybrid Reader API, which is used when Iceberg is configured with +`spark.sql.iceberg.parquet.reader-type=COMET`. + +## Package: `org.apache.comet.parquet` + +### FileReader + +Main class for reading Parquet files with native decoding. + +```java +// Constructor +public FileReader( + WrappedInputFile inputFile, + ReadOptions options, + Map<String, String> properties, + Long start, + Long length, + byte[] fileEncryptionKey, + byte[] fileAADPrefix +) throws IOException + +// Methods used by Iceberg +public void setRequestedSchemaFromSpecs(List<ParquetColumnSpec> specs) +public RowGroupReader readNextRowGroup() throws IOException +public void skipNextRowGroup() +public void close() throws IOException +``` + +### RowGroupReader + +Provides access to row group data. + +```java +// Methods used by Iceberg +public long getRowCount() +``` + +### ReadOptions + +Configuration for Parquet read operations. + +```java +// Builder pattern +public static Builder builder(Configuration conf) + +public class Builder { + public ReadOptions build() +} +``` + +### WrappedInputFile + +Wrapper that adapts Iceberg's `InputFile` interface to Comet's file reading infrastructure. + +```java +// Constructor +public WrappedInputFile(org.apache.iceberg.io.InputFile inputFile) +``` + +### ParquetColumnSpec + +Specification describing a Parquet column's schema information. + +```java +// Constructor +public ParquetColumnSpec( + int fieldId, + String[] path, + String physicalType, + int typeLength, + boolean isRepeated, + int maxDefinitionLevel, + int maxRepetitionLevel, + String logicalTypeName, + Map<String, String> logicalTypeParams +) + +// Getters used by Iceberg +public int getFieldId() +public String[] getPath() +public String getPhysicalType() +public int getTypeLength() +public int getMaxDefinitionLevel() +public int getMaxRepetitionLevel() +public String getLogicalTypeName() +public Map<String, String> getLogicalTypeParams() +``` + +### AbstractColumnReader + +Base class for column readers. + +```java +// Protected field accessed by Iceberg subclasses +protected long nativeHandle + +// Methods used by Iceberg +public void setBatchSize(int batchSize) +public void close() +``` + +### ColumnReader + +Column reader for regular Parquet columns (extends `AbstractColumnReader`). + +```java +// Methods used by Iceberg +public void setPageReader(PageReader pageReader) throws IOException +``` + +### BatchReader + +Coordinates reading batches across multiple column readers. + +```java +// Constructor +public BatchReader(AbstractColumnReader[] columnReaders) + +// Methods used by Iceberg +public void setSparkSchema(StructType schema) +public AbstractColumnReader[] getColumnReaders() +public void nextBatch(int batchSize) +``` + +### MetadataColumnReader + +Reader for metadata columns (used for Iceberg's delete and position columns). + +```java +// Constructor +public MetadataColumnReader( + DataType sparkType, + ColumnDescriptor descriptor, + boolean useDecimal128, + boolean isConstant +) + +// Methods used by Iceberg +public void readBatch(int total) +public CometVector currentBatch() + +// Protected field accessed by subclasses +protected long nativeHandle +``` + +### ConstantColumnReader + +Reader for columns with constant/default values (extends `MetadataColumnReader`). + +```java +// Constructor +public ConstantColumnReader( + DataType sparkType, + ColumnDescriptor descriptor, + Object value, + boolean useDecimal128 +) +``` + +### Native + +JNI interface for native operations. + +```java +// Static methods used by Iceberg +public static void resetBatch(long nativeHandle) +public static void setIsDeleted(long nativeHandle, boolean[] isDeleted) +public static void setPosition(long nativeHandle, long position, int total) +``` + +### TypeUtil + +Utilities for Parquet type conversions. + +```java +// Methods used by Iceberg +public static ColumnDescriptor convertToParquet(StructField sparkField) +``` + +### Utils + +General utility methods. + +```java +// Methods used by Iceberg +public static AbstractColumnReader getColumnReader( + DataType sparkType, + ColumnDescriptor descriptor, Review Comment: The parameter types do not match with https://github.com/apache/datafusion-comet/pull/3237/changes#diff-54de18a6f3ec3c2944f1628012f8c0b0af863da30419a8bc989eb6cd8ccb8cd1R39-R46 ```suggestion ParquetColumnSpec columnSpec, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
