This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a4f9e25457 [INLONG-11100][Sort] The buffer queue is not released after
sending messages to elasticsearch (#11101)
a4f9e25457 is described below
commit a4f9e254574c2e0d8f7a3732ca42940607ec452b
Author: vernedeng <[email protected]>
AuthorDate: Sat Sep 14 10:13:39 2024 +0800
[INLONG-11100][Sort] The buffer queue is not released after sending
messages to elasticsearch (#11101)
---
.../sort/standalone/channel/BufferQueueChannel.java | 16 ++++++++++------
.../standalone/sink/elasticsearch/EsChannelWorker.java | 2 +-
2 files changed, 11 insertions(+), 7 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java
index 16b00e3605..5db46b11ea 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java
@@ -36,7 +36,7 @@ import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicLong;
/**
- *
+ *
* BufferQueueChannel
*/
public class BufferQueueChannel extends AbstractChannel {
@@ -45,6 +45,7 @@ public class BufferQueueChannel extends AbstractChannel {
public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB =
"maxBufferQueueSizeKb";
public static final String KEY_RELOADINTERVAL = "reloadInterval";
+ public static final String KEY_TASK_NAME = "taskName";
public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;
// global buffer size
@@ -54,6 +55,7 @@ public class BufferQueueChannel extends AbstractChannel {
protected Timer channelTimer;
private AtomicLong takeCounter = new AtomicLong(0);
private AtomicLong putCounter = new AtomicLong(0);
+ private String taskName;
/**
* Constructor
@@ -66,7 +68,7 @@ public class BufferQueueChannel extends AbstractChannel {
/**
* put
- *
+ *
* @param event
* @throws ChannelException
*/
@@ -88,7 +90,7 @@ public class BufferQueueChannel extends AbstractChannel {
/**
* take
- *
+ *
* @return Event
* @throws ChannelException
*/
@@ -106,7 +108,7 @@ public class BufferQueueChannel extends AbstractChannel {
/**
* getTransaction
- *
+ *
* @return
*/
@Override
@@ -138,7 +140,8 @@ public class BufferQueueChannel extends AbstractChannel {
TimerTask channelTask = new TimerTask() {
public void run() {
- LOG.info("queueSize:{},availablePermits:{},put:{},take:{}",
+
LOG.info("taskName:{},queueSize:{},availablePermits:{},put:{},take:{}",
+ taskName,
bufferQueue.size(),
bufferQueue.availablePermits(),
putCounter.getAndSet(0),
@@ -152,11 +155,12 @@ public class BufferQueueChannel extends AbstractChannel {
/**
* configure
- *
+ *
* @param context
*/
@Override
public void configure(Context context) {
+ this.taskName = context.getString(KEY_TASK_NAME);
}
/**
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
index caa1fbbb86..93a29e0142 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
@@ -106,7 +106,6 @@ public class EsChannelWorker extends Thread {
context.addSendFailMetric();
profileEvent.ack();
}
- tx.commit();
} else {
List<EsIndexRequest> indexRequestList = handler.parse(
context, profileEvent,
context.getTransformProcessor(profileEvent.getUid()));
@@ -117,6 +116,7 @@ public class EsChannelWorker extends Thread {
profileEvent.ack();
}
}
+ tx.commit();
} catch (Throwable t) {
LOG.error("Process event failed!" + this.getName(), t);