aloyszhang commented on code in PR #10095:
URL: https://github.com/apache/inlong/pull/10095#discussion_r1582061654
##########
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java:
##########
@@ -115,6 +117,63 @@ public OffsetProfile getOffset(String taskId, String
instanceId) {
return offsetDb.getOffset(taskId, instanceId);
}
+ private void cleanDbOffset() {
+ List<OffsetProfile> offsets = offsetDb.listAllOffsets();
+ offsets.forEach(offset -> {
+ String taskId = offset.getTaskId();
+ String instanceId = offset.getInstanceId();
+ InstanceProfile instanceProfile = instanceDb.getInstance(taskId,
instanceId);
+ if (instanceProfile == null) {
+ deleteOffset(taskId, instanceId);
+ LOGGER.info("instance not found, delete offset taskId {}
instanceId {}", taskId,
+ instanceId);
+ }
+ });
+ LOGGER.info("offsetManager running! offsets count {}", offsets.size());
+ }
+
+ private void cleanDbInstance() {
+ AtomicInteger cleanCount = new AtomicInteger();
+ Iterator<InstanceProfile> iterator =
instanceDb.listAllInstances().listIterator();
+ while (iterator.hasNext()) {
+ if (cleanCount.get() > CLEAN_INSTANCE_ONCE_LIMIT) {
+ return;
+ }
+ InstanceProfile instanceFromDb = iterator.next();
+ String taskId = instanceFromDb.getTaskId();
+ String instanceId = instanceFromDb.getInstanceId();
+ if (instanceFromDb.getState() != InstanceStateEnum.FINISHED) {
+ return;
Review Comment:
Should we also check the state of following instances?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]