autumnust commented on a change in pull request #3398:
URL: https://github.com/apache/gobblin/pull/3398#discussion_r711355246
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -861,11 +861,13 @@ public void flush(String dbName, String tableName) throws
IOException {
}
/**
+ * NOTE: cw for a window [t1, t2] is marked as t2 if audit counts match
Review comment:
Let's avoid abbr. like `cw` in the comments.
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -729,9 +729,9 @@ private StructLike
addLatePartitionValueToIcebergTable(Table table, TableMetadat
private int isLate(String datepartition, long previousWatermark) {
ZonedDateTime partitionDateTime = ZonedDateTime.parse(datepartition,
HOURLY_DATEPARTITION_FORMAT);
long partitionEpochTime = partitionDateTime.toInstant().toEpochMilli();
- if(partitionEpochTime > previousWatermark) {
+ if(partitionEpochTime >= previousWatermark) {
return 0;
- } else if(partitionEpochTime <= previousWatermark &&
partitionDateTime.toLocalDate().equals(getDateFromEpochMillis(previousWatermark)))
{
+ } else if(partitionEpochTime < previousWatermark &&
partitionDateTime.toLocalDate().equals(getDateFromEpochMillis(previousWatermark)))
{
Review comment:
this line seems way too long to me, did you apply style file in your IDE
?
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -895,9 +897,9 @@ private long computeCompletenessWatermark(String table,
SortedSet<ZonedDateTime>
while (iterator.hasNext()) {
ZonedDateTime timestampDT = iterator.next();
if (timestampDT.isAfter(prevWatermarkDT)
- && TimeIterator.durationBetween(prevWatermarkDT, now, granularity)
> 1) {
+ && TimeIterator.durationBetween(prevWatermarkDT, now, granularity)
> 0) {
long timestampMillis = timestampDT.toInstant().toEpochMilli();
- if(auditCountVerifier.get().isComplete(table, timestampMillis,
TimeIterator.inc(timestampDT, granularity, 1).toInstant().toEpochMilli())) {
+ if(auditCountVerifier.get().isComplete(table,
TimeIterator.dec(timestampDT, granularity, 1).toInstant().toEpochMilli(),
timestampMillis)) {
Review comment:
there's no blank after `(` and line is too long
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]