This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 33cadc5b498 HIVE-28306: Iceberg: Return new scan after applying column
project parameter (Butao Zhang, reviewed by Denys Kuzmenko)
33cadc5b498 is described below
commit 33cadc5b498b57f779cddc9bf4e3f8aef2d9f6dc
Author: Butao Zhang <[email protected]>
AuthorDate: Tue Jun 11 18:59:42 2024 +0800
HIVE-28306: Iceberg: Return new scan after applying column project
parameter (Butao Zhang, reviewed by Denys Kuzmenko)
Closes #5282
---
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 22 +++++++++++++---------
.../mr/mapreduce/IcebergInternalRecordWrapper.java | 2 +-
2 files changed, 14 insertions(+), 10 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 566832bedfb..efb3988688d 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -60,7 +60,6 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Scan;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.SystemConfigs;
@@ -178,14 +177,19 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
Long openFileCost = splitSize > 0 ? splitSize :
TableProperties.SPLIT_SIZE_DEFAULT;
scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST,
String.valueOf(openFileCost));
}
- String schemaStr = conf.get(InputFormatConfig.READ_SCHEMA);
- if (schemaStr != null) {
- scan.project(SchemaParser.fromJson(schemaStr));
- }
-
- String[] selectedColumns =
conf.getStrings(InputFormatConfig.SELECTED_COLUMNS);
- if (selectedColumns != null) {
- scan.select(selectedColumns);
+ // TODO: Currently, this projection optimization stored on scan is not
being used effectively on Hive side, as
+ // Hive actually uses conf to propagate the projected columns to let the
final reader to read the only
+ // projected columns data. See
IcebergInputFormat::readSchema(Configuration conf, Table table, boolean
+ // caseSensitive). But we can consider using this projection
optimization stored on scan in the future when
+ // needed.
+ Schema readSchema = InputFormatConfig.readSchema(conf);
+ if (readSchema != null) {
+ scan = scan.project(readSchema);
+ } else {
+ String[] selectedColumns = InputFormatConfig.selectedColumns(conf);
+ if (selectedColumns != null) {
+ scan = scan.select(selectedColumns);
+ }
}
// TODO add a filter parser to get rid of Serialization
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInternalRecordWrapper.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInternalRecordWrapper.java
index 241c12a2d39..f63516a601a 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInternalRecordWrapper.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInternalRecordWrapper.java
@@ -57,7 +57,7 @@ public class IcebergInternalRecordWrapper implements Record,
StructLike {
public IcebergInternalRecordWrapper wrap(StructLike record) {
int idx = 0;
for (Types.NestedField field : readSchema.fields()) {
- int position = fieldToPositionInTableSchema.get(field.name());
+ int position = fieldToPositionInReadSchema.get(field.name());
values[idx] = record.get(position, Object.class);
idx++;
}