This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1548350bd [GOBBLIN-1833]Emit Completeness watermark information in
snapshotCommitEvent (#3696)
1548350bd is described below
commit 1548350bdb3dedc10c66c4951684c75dc230c304
Author: Zihan Li <[email protected]>
AuthorDate: Thu May 18 17:34:02 2023 -0700
[GOBBLIN-1833]Emit Completeness watermark information in
snapshotCommitEvent (#3696)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1833]Emit Completeness watermark information in
snapshotCommitEvent
* address comments
---------
Co-authored-by: Zihan Li <[email protected]>
---
.../org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 1e88a83c7..3135a1243 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -788,7 +788,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
* @return kafka topic name for this table
*/
protected String getTopicName(TableIdentifier tid, TableMetadata
tableMetadata) {
- if (tableMetadata.dataOffsetRange.isPresent()) {
+ if (tableMetadata.dataOffsetRange.isPresent() &&
tableMetadata.dataOffsetRange.get().size() != 0) {
String topicPartitionString =
tableMetadata.dataOffsetRange.get().keySet().iterator().next();
//In case the topic name is not the table name or the topic name
contains '-'
return topicPartitionString.substring(0,
topicPartitionString.lastIndexOf('-'));
@@ -1011,6 +1011,9 @@ public class IcebergMetadataWriter implements
MetadataWriter {
gobblinTrackingEvent.addMetadata(entry.getKey(), entry.getValue());
}
}
+ if (tableMetadata.completenessEnabled) {
+ gobblinTrackingEvent.addMetadata(COMPLETION_WATERMARK_KEY,
Long.toString(tableMetadata.completionWatermark));
+ }
eventSubmitter.submit(gobblinTrackingEvent);
}