[
https://issues.apache.org/jira/browse/GOBBLIN-1533?focusedWorklogId=648137&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-648137
]
ASF GitHub Bot logged work on GOBBLIN-1533:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 08/Sep/21 18:30
Start Date: 08/Sep/21 18:30
Worklog Time Spent: 10m
Work Description: vikrambohra commented on a change in pull request #3385:
URL: https://github.com/apache/gobblin/pull/3385#discussion_r704673159
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -616,23 +689,75 @@ protected void addFiles(GobblinMetadataChangeEvent gmce,
Map<String, Collection<
/**
* Method to get dataFiles with metrics information
* This method is used to get files to be added to iceberg
+ * if completeness is enabled a new field (late) is added to table schema
and partition spec
+ * computed based on datepartition and completion watermark
* This method will call method {IcebergUtils.getIcebergDataFileWithMetric}
to get DataFile for specific file path
*/
- private Set<DataFile>
getIcebergDataFilesToBeAdded(List<org.apache.gobblin.metadata.DataFile> files,
+ private Set<DataFile> getIcebergDataFilesToBeAdded(Table table,
TableMetadata tableMetadata, GobblinMetadataChangeEvent gmce,
List<org.apache.gobblin.metadata.DataFile> files,
PartitionSpec partitionSpec, Map<String, Collection<HiveSpec>>
newSpecsMap, Map<Integer, Integer> schemaIdMap) {
Set<DataFile> dataFiles = new HashSet<>();
for (org.apache.gobblin.metadata.DataFile file : files) {
try {
- StructLike partition = getIcebergPartitionVal(newSpecsMap.get(new
Path(file.getFilePath()).getParent().toString()),
- file.getFilePath(), partitionSpec);
- dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file,
partitionSpec, partition, conf, schemaIdMap));
+ Collection<HiveSpec> hiveSpecs = newSpecsMap.get(new
Path(file.getFilePath()).getParent().toString());
+ StructLike partition = getIcebergPartitionVal(hiveSpecs,
file.getFilePath(), partitionSpec);
+
+ if(tableMetadata.completenessEnabled && gmce.getOperationType() ==
OperationType.add_files) {
+ // Assumes first partition value to be partitioned by date
+ // TODO Find better way to determine a partition value
+ String datepartition = partition.get(0, null);
+ partition = addLatePartitionValueToIcebergTable(table, tableMetadata,
+ hiveSpecs.iterator().next().getPartition().get(), datepartition);
+
tableMetadata.datePartitions.add(getEpochMillisFromDatepartitionString(datepartition));
+ }
+ dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file,
table.spec(), partition, conf, schemaIdMap));
} catch (Exception e) {
log.warn("Cannot get DataFile for {} dur to {}", file.getFilePath(),
e);
}
}
return dataFiles;
}
+ /**
+ * 1. Add "late" partition column to iceberg table if not exists
+ * 2. compute "late" partition value based on datepartition and completion
watermark
+ * @param table
+ * @param tableMetadata
+ * @param hivePartition
+ * @param datepartition
+ * @return new iceberg partition value for file
+ */
+ private StructLike addLatePartitionValueToIcebergTable(Table table,
TableMetadata tableMetadata, HivePartition hivePartition, String datepartition)
{
+ table = addPartitionToIcebergTable(table, newPartitionColumn,
newPartitionColumnType);
Review comment:
I think it makes sense for it to be here. I will remove the one during
table creation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 648137)
Time Spent: 2h 40m (was: 2.5h)
> Add Completeness watermark to iceberg table
> -------------------------------------------
>
> Key: GOBBLIN-1533
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1533
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Vikram Bohra
> Priority: Major
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)