ahmedabu98 commented on issue #33497:
URL: https://github.com/apache/beam/issues/33497#issuecomment-2580542191
Okay when I read datafiles like this, I do see the bad data:
<details>
<summary>Code snipper</summary>
```java
FileIO io = table.io();
EncryptionManager encryptionManager = table.encryption();
TableScan scan = table.newScan();
for (CombinedScanTask combinedScanTask : scan.planTasks()) {
InputFilesDecryptor decryptor =
new InputFilesDecryptor(combinedScanTask, io, encryptionManager);
for (FileScanTask fileScanTask : combinedScanTask.tasks()) {
Map<Integer, ?> idToConstants =
constantsMap(fileScanTask,
IdentityPartitionConverters::convertConstant, iceSchema);
InputFile inputFile = decryptor.getInputFile(fileScanTask);
CloseableIterable<Record> iterable =
Parquet.read(inputFile)
.split(fileScanTask.start(), fileScanTask.length())
.project(table.schema())
.createReaderFunc(
fileSchema ->
GenericParquetReaders.buildReader(
table.schema(), fileSchema, idToConstants))
.filter(fileScanTask.residual())
.build();
for (Record rec : iterable) {
System.out.println("xxx " + rec);
}
}
}
...
private static Map<Integer, ?> constantsMap(
FileScanTask task,
BiFunction<Type, Object, Object> converter,
org.apache.iceberg.Schema schema) {
PartitionSpec spec = task.spec();
Set<Integer> idColumns = spec.identitySourceIds();
org.apache.iceberg.Schema partitionSchema = TypeUtil.select(schema,
idColumns);
boolean projectsIdentityPartitionColumns =
!partitionSchema.columns().isEmpty();
List<Types.NestedField> partitionFields = spec.partitionType().fields();
List<PartitionField> fields = spec.fields();
StructLike partitionData = task.file().partition();
if (projectsIdentityPartitionColumns) {
return PartitionUtil.constantsMap(task, converter);
} else {
return Collections.emptyMap();
}
}
```
</details>
The key difference being the additional `idToConstants` passed to the
reader. I noticed this was added to our Iceberg source recently in #33332, and
AFAICT it replaces the record's column value with the stored partition value.
So the datafile is indeed part of the table but the partition value takes
precedence. Seems that we are probably setting the wrong partition value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]