aloyszhang commented on code in PR #9984:
URL: https://github.com/apache/inlong/pull/9984#discussion_r1565110961
##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java:
##########
@@ -41,11 +96,169 @@ public abstract class AbstractSource implements Source {
protected String metricName;
protected Map<String, String> dimensions;
protected static final AtomicLong METRIX_INDEX = new AtomicLong(0);
+ protected volatile boolean runnable = true;
+ protected volatile boolean running = false;
+ protected String taskId;
+ protected String instanceId;
+ protected InstanceProfile profile;
+ private ExtendedHandler extendedHandler;
+ private boolean isRealTime = false;
+ protected volatile long emptyCount = 0;
+ protected int maxPackSize;
+ private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
+ 0, Integer.MAX_VALUE,
+ 1L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new AgentThreadFactory("source-pool"));
+ protected OffsetProfile offsetProfile;
@Override
public void init(InstanceProfile profile) {
+ this.profile = profile;
+ taskId = profile.getTaskId();
+ instanceId = profile.getInstanceId();
inlongGroupId = profile.getInlongGroupId();
inlongStreamId = profile.getInlongStreamId();
+ maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE,
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
+ queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
+ String cycleUnit = profile.get(TASK_CYCLE_UNIT);
+ if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
+ isRealTime = true;
+ }
+ initOffset();
+ registerMetric();
+ initExtendHandler();
+ }
+
+ protected void initOffset() {
+ offsetProfile = OffsetManager.getInstance().getOffset(taskId,
instanceId);
+ }
+
+ @Override
+ public void start() {
+ EXECUTOR_SERVICE.execute(run());
+ }
+
+ private Runnable run() {
+ return () -> {
+ AgentThreadFactory.nameThread(getThreadName());
+ running = true;
+ try {
+ doRun();
+ } catch (Throwable e) {
+ LOGGER.error("do run error maybe file deleted: ", e);
+ }
+ running = false;
+ };
+ }
+
+ private void doRun() {
+ long lastPrintTime = 0;
+ while (isRunnable()) {
+ if (!prepareToRead()) {
+ break;
+ }
+ List<SourceData> lines = readFromSource();
+ if (lines != null && lines.isEmpty()) {
+ if (queue.isEmpty()) {
+ emptyCount++;
+ } else {
+ emptyCount = 0;
+ }
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
+ AgentUtils.silenceSleepInSeconds(1);
+ continue;
+ }
+ emptyCount = 0;
+ for (int i = 0; i < lines.size(); i++) {
+ boolean suc4Queue =
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).getData().length);
+ if (!suc4Queue) {
+ break;
+ }
+ putIntoQueue(lines.get(i));
+ }
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
+ if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_INTERVAL_MS) {
+ lastPrintTime = AgentUtils.getCurrentTime();
+ printCurrentState();
+ }
+ }
+ }
+
+ protected abstract void printCurrentState();
+
+ /**
+ * Before reading the data source, some preparation operations need to be
done, such as memory control semaphore
+ * application and data source legitimacy verification
+ *
+ * @return true if prepared ok
+ */
+ private boolean prepareToRead() {
+ if (!doPrepareToRead()) {
+ return false;
+ }
+ return waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
+ }
+
+ /**
+ * Except for applying for memory control semaphores, all other
preparatory work is implemented by this function
+ *
+ * @return true if prepared ok
+ */
+ protected abstract boolean doPrepareToRead();
+
+ /**
+ * After preparation work, we started to truly read data from the data
source
+ *
+ * @return source data list
+ */
+ protected abstract List<SourceData> readFromSource();
+
+ private boolean waitForPermit(String permitName, int permitLen) {
+ boolean suc = false;
+ while (!suc) {
+ suc = MemoryManager.getInstance().tryAcquire(permitName,
permitLen);
+ if (!suc) {
+ MemoryManager.getInstance().printDetail(permitName, "source");
+ if (!isRunnable()) {
+ return false;
+ }
+ AgentUtils.silenceSleepInSeconds(1);
+ }
+ }
+ return true;
+ }
+
+ /**
+ * After preparation work, we started to truly read data from the data
source
+ */
+ private void putIntoQueue(SourceData sourceData) {
+ if (sourceData == null) {
+ return;
+ }
+ try {
+ boolean offerSuc = false;
+ while (isRunnable() && offerSuc != true) {
Review Comment:
```suggestion
while (isRunnable() && !offerSuc) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]