This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 31c96a0f3 [INLONG-6076][Audit] Bugs in the
ClickHouseService.processOutput() (#6077)
31c96a0f3 is described below
commit 31c96a0f3217ec2c32aecebbc1803feea126139f
Author: Goson Zhang <[email protected]>
AuthorDate: Fri Sep 30 19:51:14 2022 +0800
[INLONG-6076][Audit] Bugs in the ClickHouseService.processOutput() (#6077)
---
.../inlong/audit/service/ClickHouseService.java | 45 +++++++++++-----------
1 file changed, 22 insertions(+), 23 deletions(-)
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
index 24042d5aa..6d98973a2 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
/**
* ClickHouseService
@@ -54,7 +55,7 @@ public class ClickHouseService implements InsertData,
AutoCloseable {
private LinkedBlockingQueue<ClickHouseDataPo> batchQueue;
private AtomicBoolean needBatchOutput = new AtomicBoolean(false);
private AtomicInteger batchCounter = new AtomicInteger(0);
-
+ private AtomicLong lastCheckTime = new
AtomicLong(System.currentTimeMillis());
private Connection conn;
/**
@@ -77,17 +78,11 @@ public class ClickHouseService implements InsertData,
AutoCloseable {
Class.forName(chConfig.getDriver());
this.reconnect();
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ LOG.error("ClickHouseService start failure!", e);
}
- // timer
- long currentTime = System.currentTimeMillis();
- // batch output interval
- timerService.scheduleWithFixedDelay(() ->
needBatchOutput.compareAndSet(false, true),
- currentTime + chConfig.getBatchIntervalMs(),
- chConfig.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
- // batch output process
- timerService.scheduleWithFixedDelay(() -> processOutput(),
- currentTime + chConfig.getProcessIntervalMs(),
+ // start timer
+ timerService.scheduleWithFixedDelay(this::processOutput,
+ chConfig.getProcessIntervalMs(),
chConfig.getProcessIntervalMs(), TimeUnit.MILLISECONDS);
}
@@ -95,14 +90,15 @@ public class ClickHouseService implements InsertData,
AutoCloseable {
* processOutput
*/
private void processOutput() {
- if (!this.needBatchOutput.get()) {
+ if (!this.needBatchOutput.get()
+ && (System.currentTimeMillis() - lastCheckTime.get() <
chConfig.getBatchIntervalMs())) {
return;
}
// output
try (PreparedStatement pstat = this.conn.prepareStatement(INSERT_SQL))
{
- // insert data
- ClickHouseDataPo data = this.batchQueue.poll();
int counter = 0;
+ // output data to clickhouse
+ ClickHouseDataPo data = this.batchQueue.poll();
while (data != null) {
pstat.setString(1, data.getIp());
pstat.setString(2, data.getDockerId());
@@ -124,20 +120,23 @@ public class ClickHouseService implements InsertData,
AutoCloseable {
this.conn.commit();
counter = 0;
}
+ data = this.batchQueue.poll();
}
- this.batchCounter.set(0);
- pstat.executeBatch();
- this.conn.commit();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ if (counter > 0) {
+ pstat.executeBatch();
+ this.conn.commit();
+ }
+ } catch (Exception e1) {
+ LOG.error("Execute output to clickhouse failure!", e1);
+ // re-connect clickhouse
try {
this.reconnect();
- } catch (SQLException e1) {
- LOG.error(e1.getMessage(), e1);
+ } catch (SQLException e2) {
+ LOG.error("Re-connect clickhouse failure!", e2);
}
}
-
- // recover
+ // recover flag
+ lastCheckTime.set(System.currentTimeMillis());
this.needBatchOutput.compareAndSet(true, false);
}