DimitrisStaratzis opened a new issue, #7556:
URL: https://github.com/apache/iceberg/issues/7556

   ### Apache Iceberg version
   
   1.2.1 (latest release)
   
   ### Query engine
   
   Other
   
   ### Please describe the bug 🐞
   
   Hi everyone, 
   
   I have created the following code that reproduces an issue that I am facing. 
I am trying to create a simple table using the JAVA API with one int column 
named: log_level. In my implementation the column can take values randomly from 
0 to 4. The data file format I am using is parquet. The ```batches``` variable 
determines how many files will be written. There is also the 
```recordsPerBatch``` which determines the number of records per batch. To read 
back my data I use the ```VectorizedTableScanIterable``` object which is just a 
wrapper of the apache iceberg ```ArrowReader```. Now the problem is that when I 
run the following code, and I apply a simple filter, in this case ```log_level 
> 2``` the results come back unfiltered. However if I change:
   ```builder.add(record.copy(ImmutableMap.of("log_level", 
rand.nextInt(5))));``` to 
   ```builder.add(record.copy(ImmutableMap.of("log_level", j % 5)));``` which 
will essentially store the same value in each file, because ```j``` is the 
current file (batch) the filtering works. So essentially, in my understanding, 
filtering only works on files rather than individual rows. Is this the expected 
behavior or am I missing something? Also, when I use the commented part of the 
code below, filtering works row by row as I would expect. 
   
   here is the example I have written to reproduce the issue:
   
   ```package org.example;
   
   import com.google.common.collect.ImmutableList;
   import com.google.common.collect.ImmutableMap;
   import org.apache.commons.io.FileUtils;
   import org.apache.iceberg.DataFile;
   import org.apache.iceberg.PartitionSpec;
   import org.apache.iceberg.Schema;
   import org.apache.iceberg.Table;
   import org.apache.iceberg.TableScan;
   import org.apache.iceberg.arrow.vectorized.ColumnVector;
   import org.apache.iceberg.arrow.vectorized.ColumnarBatch;
   import org.apache.iceberg.arrow.vectorized.VectorizedTableScanIterable;
   import org.apache.iceberg.catalog.TableIdentifier;
   import org.apache.iceberg.data.GenericRecord;
   import org.apache.iceberg.data.IcebergGenerics;
   import org.apache.iceberg.data.Record;
   import org.apache.iceberg.data.parquet.GenericParquetWriter;
   import org.apache.iceberg.expressions.Expressions;
   import org.apache.iceberg.hadoop.HadoopCatalog;
   import org.apache.iceberg.io.CloseableIterable;
   import org.apache.iceberg.io.DataWriter;
   import org.apache.iceberg.io.OutputFile;
   import org.apache.iceberg.parquet.Parquet;
   import org.apache.iceberg.types.Types;
   import org.apache.hadoop.conf.Configuration;
   
   import java.io.File;
   import java.util.Iterator;
   import java.util.Map;
   import java.util.Random;
   
   public class Example {
       private static final int batches = 5;
       private static final int recordsPerBatch = 20;
   
       private static final String tableName = "iceberg_example";
   
       public static void main(String[] args) throws Exception {
           deleteIfExists();
           Table table = createLogArray();
           filteredScan(table);
   
       }
   
       public static Table createLogArray() throws Exception {
           Configuration conf = new Configuration();
           conf.set("fs.defaultFS", "file:///");
           String warehousePath = ".";
           HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
   
           Schema schema = new Schema(
                   Types.NestedField.required(1, "log_level", 
Types.IntegerType.get())
           );
   
           PartitionSpec spec = PartitionSpec.builderFor(schema).build();
   
           // Create a table with the given schema
           TableIdentifier name = TableIdentifier.of(tableName);
           Map<String, String> properties = 
ImmutableMap.of("write.parquet.page-size-bytes", 1000 + ""); // set the page 
size to 1MB
   
           Table table = catalog.createTable(name, schema, spec, properties);
   
           // write data in batches
           Random rand = new Random();
           for (int j = 0; j < batches; j++) {
               // generate data
               GenericRecord record = GenericRecord.create(schema);
               ImmutableList.Builder<GenericRecord> builder = 
ImmutableList.builder();
               for (int i = 0; i < recordsPerBatch; i++) {
                   // create a random record
                   builder.add(record.copy(ImmutableMap.of("log_level", 
rand.nextInt(5))));
               }
   
               String filepath = table.location() + "/" + "file_" + j + 
".parquet"; //uuid not necessary
               OutputFile file = table.io().newOutputFile(filepath);
               DataWriter<GenericRecord> dataWriter =
                       Parquet.writeData(file)
                               .schema(schema)
                               
.createWriterFunc(GenericParquetWriter::buildWriter)
                               .overwrite()
                               .withSpec(PartitionSpec.unpartitioned())
                               .build();
   
               try {
                   for (GenericRecord rec : builder.build()) {
                       dataWriter.write(rec);
                   }
               } finally {
                   dataWriter.close();
               }
   
               DataFile dataFile = dataWriter.toDataFile();
   
               table.newAppend().appendFile(dataFile).commit();
           }
           catalog.close();
           return table;
       }
   
   
       public static void filteredScan(Table table) throws Exception {
   
           TableScan scan = 
table.newScan().select("log_level").filter(Expressions.greaterThan("log_level", 
2));
           VectorizedTableScanIterable vectorizedTableScanIterable = new 
VectorizedTableScanIterable(scan, (recordsPerBatch * batches), true);
           Iterator<ColumnarBatch> iterator = 
vectorizedTableScanIterable.iterator();
           while (iterator.hasNext()) {
               ColumnarBatch batch = iterator.next();
               ColumnVector att = batch.column(0);
               for (int i = 0; i < batch.numRows(); i++) {
                   int value = att.getInt(i);
                   System.out.println(value);
               }
           }
   //        CloseableIterable<Record> result = IcebergGenerics.read(table)
   //                .select("log_level")
   //                .where(Expressions.greaterThan("log_level", 2))
   //                .build();
   //        for (Record record : result) {
   //            System.out.println(record.get(0));
   //        }
   //        result.close();
       }
   
       public static void deleteIfExists() throws Exception {
           File directory = new File(tableName);
           if (directory.exists()) {
               if (directory.isDirectory()) {
                   try {
                       FileUtils.deleteDirectory(directory);
                       System.out.println("Directory deleted successfully.");
                   } catch (Exception e) {
                       System.out.println("Failed to delete directory: " + 
e.getMessage());
                   }
               } else {
                   System.out.println("File with name " + tableName + " exists 
but is not a directory.");
               }
           } else {
               System.out.println("Directory with name " + tableName + " does 
not exist.");
           }
       }
   }
   ```
   
   Thank you in advance!
   
   Best, 
   Dimitris


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to