vikrambohra commented on code in PR #3527:
URL: https://github.com/apache/gobblin/pull/3527#discussion_r920596714
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java:
##########
@@ -78,6 +78,8 @@ public class GobblinMCEPublisher extends DataPublisher {
private static final PathFilter HIDDEN_FILES_FILTER = new HiddenFilter();
private static final Metrics DUMMY_METRICS = new Metrics(100000000L, null,
null, null, null);
+ public static final String SERIALIZED_AUDIT_MAP_KEY = "serializedAuditMap";
Review Comment:
change to SERIALIZED_AUDIT_COUNTS_MAP_KEY
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -1081,6 +1092,9 @@ public void writeEnvelope(RecordEnvelope<GenericRecord>
recordEnvelope, Map<Stri
}
}
+ public void sendAuditCounts(String topicName, List<String>
serializedAuditCountMaps) {
Review Comment:
Add javadoc to this method explaining the intention. Also, can we change to
Collection<String> instead of List<String>
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -145,6 +145,8 @@ public class IcebergMetadataWriter implements
MetadataWriter {
private final static String DEFAULT_EXPIRE_SNAPSHOTS_LOOKBACK_TIME = "3d";
private static final String ICEBERG_REGISTRATION_BLACKLIST =
"iceberg.registration.blacklist";
private static final String ICEBERG_REGISTRATION_WHITELIST =
"iceberg.registration.whitelist";
+ private static final String ICEBERG_REGISTRATION_AUDIT_BLACKLIST =
"iceberg.registration.audit.blacklist";
Review Comment:
iceberg.registration.audit.counts.blacklist?
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java:
##########
@@ -99,12 +101,14 @@ public void publishData(Collection<? extends
WorkUnitState> states) throws IOExc
newFiles = computeDummyFile(state);
if (!newFiles.isEmpty()) {
log.info("Dummy file: " + newFiles.keySet().iterator().next());
- this.producer.sendGMCE(newFiles, null, null, offsetRange,
OperationType.change_property, SchemaSource.NONE);
+ this.producer.sendGMCE(newFiles, null, null, offsetRange,
OperationType.change_property, SchemaSource.NONE,
Review Comment:
Audit counts should be sent only on "add_files" operation type. Its safer to
send no audit counts here
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java:
##########
@@ -182,6 +187,9 @@ public GobblinMetadataChangeEvent
getGobblinMetadataChangeEvent(Map<Path, Metric
gmceBuilder.setOldFilePrefixes(oldFilePrefixes);
}
gmceBuilder.setOperationType(operationType);
+ if (serializedAuditMap != null) {
Review Comment:
the not null check seems redundant if the auditCountsMap default is nullable.
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -637,6 +644,7 @@ protected void addFiles(GobblinMetadataChangeEvent gmce,
Map<String, Collection<
private Stream<DataFile>
getIcebergDataFilesToBeAddedHelper(GobblinMetadataChangeEvent gmce, Table table,
Map<String, Collection<HiveSpec>> newSpecsMap,
TableMetadata tableMetadata) {
+ tableMetadata.serializedAuditCountMaps.add(gmce.getAuditCountMap());
Review Comment:
Null check since gmce.getAuditCountMap is nullable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]