ZihanLi58 commented on a change in pull request #3426:
URL: https://github.com/apache/gobblin/pull/3426#discussion_r751810712
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -1025,11 +1031,13 @@ public void writeEnvelope(RecordEnvelope<GenericRecord>
recordEnvelope, Map<Stri
Optional.of(currentOffset - 1);
}
tableMetadataMap.get(tid).setDatasetName(gmce.getDatasetIdentifier().getNativeName());
Review comment:
We want to set those only when low water mark is not set (which means
this is the first time we compute this table since last failure/ restart), so
we can move those lines inside the block of line 1028?
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -1078,6 +1086,10 @@ public void close() throws IOException {
long newCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
SortedSet<ZonedDateTime> datePartitions = new
TreeSet<>(Collections.reverseOrder());
+ @Setter
+ String dbName;
Review comment:
Why this two arg needed here?
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -284,9 +287,8 @@ private Long getAndPersistCurrentWatermark(TableIdentifier
tid, String topicPart
* information increases the memory footprints, therefore we would like to
flush them eagerly).
*/
public void write(GobblinMetadataChangeEvent gmce, Map<String,
Collection<HiveSpec>> newSpecsMap,
- Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec)
throws IOException {
- TableIdentifier tid = TableIdentifier.of(tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName());
- TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t ->
new TableMetadata());
+ Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec,
TableMetadata tableMetadata) throws IOException {
Review comment:
We can directly parse tid instead of tableMetadata here?
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -684,7 +687,8 @@ protected void addFiles(GobblinMetadataChangeEvent gmce,
Map<String, Collection<
Collection<HiveSpec> hiveSpecs = newSpecsMap.get(new
Path(file.getFilePath()).getParent().toString());
StructLike partition = getIcebergPartitionVal(hiveSpecs,
file.getFilePath(), partitionSpec);
- if(tableMetadata.completenessEnabled && gmce.getOperationType() ==
OperationType.add_files) {
+
if(this.newPartitionTableWhitelistBlacklist.acceptTable(tableMetadata.dbName,
tableMetadata.tableName)
+ && gmce.getOperationType() == OperationType.add_files) {
tableMetadata.prevCompletenessWatermark =
Long.parseLong(table.properties().getOrDefault(COMPLETION_WATERMARK_KEY,
Review comment:
What will happen if it's not add_files operation? i.e. what will happen
for compaction?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]