This is an automated email from the ASF dual-hosted git repository. sidmishra pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 16759fabb1e397169633fff05b4f4599ac3707b6 Author: Radhika Kundam <[email protected]> AuthorDate: Tue May 11 18:00:49 2021 -0700 ATLAS-4164: [Atlas: Spooling] Tables created after spooling are created before the spooled tables when there is multiple frequent restart in kafka brokers Signed-off-by: Sarath Subramanian <[email protected]> (cherry picked from commit 4100684fa3f63cb2a6267ab24051002ce38de017) --- .../org/apache/atlas/notification/spool/IndexManagement.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java index 28f9c70..adbb8d1 100644 --- a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java +++ b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java @@ -98,8 +98,9 @@ public class IndexManagement { } public boolean isPending() { - return !indexReader.isEmpty() || - (indexWriter.getCurrent() != null && indexWriter.getCurrent().getLine() > 0); + return !indexReader.isEmpty() + || (indexWriter.getCurrent() != null && indexWriter.getCurrent().isStatusWriteInProgress()) + || (indexReader.currentIndexRecord != null && indexReader.currentIndexRecord.getStatus() == IndexRecord.STATUS_READ_IN_PROGRESS); } public synchronized DataOutput getSpoolWriter() throws IOException { @@ -146,6 +147,8 @@ public class IndexManagement { public void update(IndexRecord record) { this.indexFileManager.updateIndex(record); + + LOG.info("this.indexFileManager.updateIndex: {}", record.getLine()); } public void flushSpoolWriter() throws IOException { @@ -349,6 +352,9 @@ public class IndexManagement { public IndexRecord next() throws InterruptedException { this.currentIndexRecord = blockingQueue.poll(retryDestinationMS, TimeUnit.MILLISECONDS); + if (this.currentIndexRecord != null) { + this.currentIndexRecord.setStatus(IndexRecord.STATUS_READ_IN_PROGRESS); + } return this.currentIndexRecord; }
