vikrambohra commented on code in PR #3527:
URL: https://github.com/apache/gobblin/pull/3527#discussion_r921443301
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -792,6 +800,9 @@ public void flush(String dbName, String tableName) throws
IOException {
String topic = props.get(TOPIC_NAME_KEY);
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
+ if (auditWhitelistBlacklist.acceptTable(dbName, tableName)) {
Review Comment:
There is a chance of optimization here. We should check if a table should be
audited when collecting the counts rather than publish time.
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -637,6 +644,7 @@ protected void addFiles(GobblinMetadataChangeEvent gmce,
Map<String, Collection<
private Stream<DataFile>
getIcebergDataFilesToBeAddedHelper(GobblinMetadataChangeEvent gmce, Table table,
Map<String, Collection<HiveSpec>> newSpecsMap,
TableMetadata tableMetadata) {
+ tableMetadata.serializedAuditCountMaps.add(gmce.getAuditCountMap());
Review Comment:
See my below comment about collecting audit counts only if the table is
whitelisted
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java:
##########
@@ -94,8 +94,13 @@ public GobblinMCEProducer(State state) {
*/
public void sendGMCE(Map<Path, Metrics> newFiles, List<String> oldFiles,
List<String> oldFilePrefixes,
Map<String, String> offsetRange, OperationType operationType,
SchemaSource schemaSource) throws IOException {
+ sendGMCE(newFiles, oldFiles, oldFilePrefixes, offsetRange, operationType,
schemaSource, null);
+ }
+
+ public void sendGMCE(Map<Path, Metrics> newFiles, List<String> oldFiles,
List<String> oldFilePrefixes,
+ Map<String, String> offsetRange, OperationType operationType,
SchemaSource schemaSource, String serializedAuditMap) throws IOException {
Review Comment:
serializedAuditCountMap?
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java:
##########
@@ -166,7 +171,7 @@ private void
setBasicInformationForGMCE(GobblinMetadataChangeEvent.Builder gmceB
public GobblinMetadataChangeEvent getGobblinMetadataChangeEvent(Map<Path,
Metrics> newFiles, List<String> oldFiles,
List<String> oldFilePrefixes, Map<String, String> offsetRange,
OperationType operationType,
- SchemaSource schemaSource) {
+ SchemaSource schemaSource, String serializedAuditMap) {
Review Comment:
serializedAuditCountMap?
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java:
##########
@@ -94,8 +94,13 @@ public GobblinMCEProducer(State state) {
*/
public void sendGMCE(Map<Path, Metrics> newFiles, List<String> oldFiles,
List<String> oldFilePrefixes,
Map<String, String> offsetRange, OperationType operationType,
SchemaSource schemaSource) throws IOException {
+ sendGMCE(newFiles, oldFiles, oldFilePrefixes, offsetRange, operationType,
schemaSource, null);
+ }
+
Review Comment:
Add javadoc with the new parameter
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]