This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0b0734fe2 [INLONG-5224][Agent][DataProxy] Remove unused
StreamConfigLog related classes (#5230)
0b0734fe2 is described below
commit 0b0734fe2a6f86a452beefdbf7974b5005f9a712
Author: healchow <[email protected]>
AuthorDate: Thu Jul 28 11:10:37 2022 +0800
[INLONG-5224][Agent][DataProxy] Remove unused StreamConfigLog related
classes (#5230)
---
.../org/apache/inlong/agent/plugin/Reader.java | 2 +-
.../apache/inlong/agent/task/TestTaskWrapper.java | 2 +-
.../agent/plugin/sources/TextFileSource.java | 6 +-
.../plugin/sources/reader/AbstractReader.java | 3 +-
.../agent/plugin/sources/reader/BinlogReader.java | 73 ++---
.../agent/plugin/sources/reader/KafkaReader.java | 2 +-
.../agent/plugin/sources/reader/SqlReader.java | 2 +-
.../plugin/sources/reader/TextFileReader.java | 2 +-
.../inlong/common/reporpter/AbstractReporter.java | 191 -------------
.../inlong/common/reporpter/ConfigLogTypeEnum.java | 47 ----
.../apache/inlong/common/reporpter/Response.java | 29 --
.../common/reporpter/StreamConfigLogMetric.java | 140 ----------
.../common/reporpter/StreamConfigLogReporter.java | 40 ---
.../common/reporpter/dto/StreamConfigLogInfo.java | 45 ----
.../common/metric/reporter/ReporterTest.java | 45 ----
inlong-dataproxy/conf/common.properties | 5 -
.../apache/inlong/dataproxy/sink/PulsarSink.java | 295 ++++++++-------------
.../dataproxy/sink/pulsar/PulsarClientService.java | 87 ++----
18 files changed, 160 insertions(+), 856 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java
index 55b768c72..3b5ec83e0 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java
@@ -49,7 +49,7 @@ public interface Reader extends Stage {
* set wait milliseconds when tailing a file
* to solve while loop cause too much cpu usage
*/
- void setWaitMillisecs(long millis);
+ void setWaitMillisecond(long millis);
/**
* get snapshot of the reader
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
index 1aad8444a..438db7fc3 100755
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
@@ -138,7 +138,7 @@ public class TestTaskWrapper {
}
@Override
- public void setWaitMillisecs(long millis) {
+ public void setWaitMillisecond(long millis) {
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index d1258685e..215d599f6 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -30,7 +30,6 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
@@ -48,10 +47,8 @@ import static
org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOU
public class TextFileSource implements Source {
// path + suffix
- public static final String MD5_SUFFIX = ".md5";
private static final Logger LOGGER =
LoggerFactory.getLogger(TextFileSource.class);
private static final String TEXT_FILE_SOURCE_TAG_NAME =
"AgentTextFileSourceMetric";
- private static AtomicLong metricsIndex = new AtomicLong(0);
public TextFileSource() {
}
@@ -68,10 +65,9 @@ public class TextFileSource implements Source {
int seekPosition = jobConf.getInt(file.getAbsolutePath() +
POSITION_SUFFIX, 0);
LOGGER.info("read from history position {} with job profile {},
file absolute path: {}", seekPosition,
jobConf.getInstanceId(), file.getAbsolutePath());
- String md5 = jobConf.get(file.getAbsolutePath() + MD5_SUFFIX, "");
TextFileReader textFileReader = new TextFileReader(file,
seekPosition);
long waitTimeout = jobConf.getLong(JOB_READ_WAIT_TIMEOUT,
DEFAULT_JOB_READ_WAIT_TIMEOUT);
- textFileReader.setWaitMillisecs(waitTimeout);
+ textFileReader.setWaitMillisecond(waitTimeout);
addValidator(filterPattern, textFileReader);
result.add(textFileReader);
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
index e9ed0ac62..50e8fbd9b 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
@@ -26,12 +26,11 @@ import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROU
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
/**
- * abstract reader, init reader and reader metrics
+ * Abstract reader, init reader and reader metrics
*/
public abstract class AbstractReader implements Reader {
protected String inlongGroupId;
-
protected String inlongStreamId;
protected String metricTagName;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 7b7961a48..5e519d731 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -27,7 +27,6 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.AgentConstants;
-import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.constant.SnapshotModeConstants;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.plugin.Message;
@@ -39,8 +38,6 @@ import org.apache.inlong.agent.pojo.DebeziumFormat;
import org.apache.inlong.agent.pojo.DebeziumOffset;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
-import org.apache.inlong.common.reporpter.ConfigLogTypeEnum;
-import org.apache.inlong.common.reporpter.StreamConfigLogMetric;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +55,7 @@ import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPAC
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
/**
- * read binlog data
+ * Binlog data reader.
*/
public class BinlogReader extends AbstractReader {
@@ -80,7 +77,7 @@ public class BinlogReader extends AbstractReader {
public static final String JOB_DATABASE_PORT = "job.binlogJob.port";
public static final String JOB_DATABASE_QUEUE_SIZE =
"job.binlogJob.queueSize";
private static final Logger LOGGER =
LoggerFactory.getLogger(BinlogReader.class);
- private static final Gson gson = new Gson();
+ private static final Gson GSON = new Gson();
private static final String BINLOG_READER_TAG_NAME = "AgentBinlogMetric";
private final AgentConfiguration agentConf =
AgentConfiguration.getAgentConf();
/**
@@ -104,16 +101,11 @@ public class BinlogReader extends AbstractReader {
private String historyMonitorDdl;
private String instanceId;
private ExecutorService executor;
- private String offset;
private String specificOffsetFile;
private String specificOffsetPos;
private BinlogSnapshotBase binlogSnapshot;
private JobProfile jobProfile;
private boolean destroyed = false;
- private boolean enableReportConfigLog;
- private StreamConfigLogMetric streamConfigLogMetric;
- private String inlongGroupId;
- private String inlongStreamId;
public BinlogReader() {
}
@@ -132,8 +124,7 @@ public class BinlogReader extends AbstractReader {
Pair<String, String> message = binlogMessagesQueue.poll();
Map<String, String> header = new HashMap<>(DEFAULT_MAP_CAPACITY);
header.put(PROXY_KEY_DATA, message.getKey());
- return new
DefaultMessage(message.getValue().getBytes(StandardCharsets.UTF_8),
- header);
+ return new
DefaultMessage(message.getValue().getBytes(StandardCharsets.UTF_8), header);
}
@Override
@@ -150,8 +141,7 @@ public class BinlogReader extends AbstractReader {
offsetFlushIntervalMs =
jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000");
databaseStoreHistoryName =
jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
tryToInitAndGetHistoryPath()) + "/history.dat" +
jobConf.getInstanceId();
- offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
- tryToInitAndGetHistoryPath()) + "/offset.dat" +
jobConf.getInstanceId();
+
snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, "");
includeSchemaChanges =
jobConf.get(JOB_DATABASE_INCLUDE_SCHEMA_CHANGES, "false");
historyMonitorDdl = jobConf.get(JOB_DATABASE_HISTORY_MONITOR_DDL,
"false");
@@ -159,75 +149,46 @@ public class BinlogReader extends AbstractReader {
instanceId = jobConf.getInstanceId();
finished = false;
- offset = jobConf.get(JOB_DATABASE_OFFSETS, "");
specificOffsetFile =
jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "");
specificOffsetPos =
jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1");
+
+ offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
+ tryToInitAndGetHistoryPath()) + "/offset.dat" +
jobConf.getInstanceId();
binlogSnapshot = new BinlogSnapshotBase(offsetStoreFileName);
+ String offset = jobConf.get(JOB_DATABASE_OFFSETS, "");
binlogSnapshot.save(offset);
- enableReportConfigLog =
-
Boolean.parseBoolean(jobConf.get(StreamConfigLogMetric.CONFIG_LOG_REPORT_ENABLE,
- "true"));
-
- inlongGroupId = jobConf.get(CommonConstants.PROXY_INLONG_GROUP_ID,
- CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID);
- inlongStreamId = jobConf.get(CommonConstants.PROXY_INLONG_STREAM_ID,
- CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID);
- metricTagName = BINLOG_READER_TAG_NAME + "_" + inlongGroupId + "_" +
inlongStreamId;
-
- if (enableReportConfigLog) {
- String reportConfigServerUrl = jobConf
- .get(StreamConfigLogMetric.CONFIG_LOG_REPORT_SERVER_URL,
"");
- String reportConfigLogInterval = jobConf
- .get(StreamConfigLogMetric.CONFIG_LOG_REPORT_INTERVAL,
"60000");
- String clientVersion = jobConf
-
.get(StreamConfigLogMetric.CONFIG_LOG_REPORT_CLIENT_VERSION, "");
- streamConfigLogMetric = new StreamConfigLogMetric(COMPONENT_NAME,
- reportConfigServerUrl,
Long.parseLong(reportConfigLogInterval),
- AgentUtils.getLocalIp(), clientVersion);
- }
+ metricTagName = String.join("_", BINLOG_READER_TAG_NAME,
inlongGroupId, inlongStreamId);
Properties props = getEngineProps();
-
- DebeziumEngine<ChangeEvent<String, String>> engine =
DebeziumEngine.create(
- io.debezium.engine.format.Json.class)
- .using(props)
+ DebeziumEngine<ChangeEvent<String, String>> engine =
DebeziumEngine.create(io.debezium.engine.format.Json.class)
.notifying((records, committer) -> {
try {
for (ChangeEvent<String, String> record : records) {
- DebeziumFormat debeziumFormat = gson
- .fromJson(record.value(),
DebeziumFormat.class);
-
binlogMessagesQueue.put(Pair.of(debeziumFormat.getSource().getTable(),
- record.value()));
+ DebeziumFormat debeziumFormat =
GSON.fromJson(record.value(), DebeziumFormat.class);
+
binlogMessagesQueue.put(Pair.of(debeziumFormat.getSource().getTable(),
record.value()));
committer.markProcessed(record);
}
committer.markBatchFinished();
} catch (Exception e) {
LOGGER.error("parse binlog message error", e);
-
}
-
})
+ .using(props)
.using((success, message, error) -> {
if (!success) {
- LOGGER.error("binlog job with jobConf {} has " +
"error {}",
- jobConf.getInstanceId(), message, error);
- streamConfigLogMetric
- .updateConfigLog(inlongGroupId,
inlongStreamId, "DBConfig",
- ConfigLogTypeEnum.ERROR, error == null
? "" : error.toString());
+ LOGGER.error("error for binlog job: {}, msg: {}",
jobConf.getInstanceId(), message, error);
}
}).build();
executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
- LOGGER.info("get initial snapshot of job {}, snapshot {}",
- jobConf.getInstanceId(), getSnapshot());
+ LOGGER.info("get initial snapshot of job {}, snapshot {}",
jobConf.getInstanceId(), getSnapshot());
}
private Properties getEngineProps() {
Properties props = new Properties();
-
props.setProperty("name", "engine" + instanceId);
props.setProperty("connector.class",
MySqlConnector.class.getCanonicalName());
@@ -316,12 +277,10 @@ public class BinlogReader extends AbstractReader {
@Override
public void setReadTimeout(long mill) {
- return;
}
@Override
- public void setWaitMillisecs(long millis) {
- return;
+ public void setWaitMillisecond(long millis) {
}
@Override
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
index 93910e99a..18bd19362 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
@@ -164,7 +164,7 @@ public class KafkaReader<K, V> extends AbstractReader {
}
@Override
- public void setWaitMillisecs(long millis) {
+ public void setWaitMillisecond(long millis) {
waitTimeout = millis;
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
index 33f4f04a9..880231cb1 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
@@ -151,7 +151,7 @@ public class SqlReader extends AbstractReader {
}
@Override
- public void setWaitMillisecs(long millis) {
+ public void setWaitMillisecond(long millis) {
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
index 7ec8e8ad1..312dd281e 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
@@ -134,7 +134,7 @@ public class TextFileReader extends AbstractReader {
}
@Override
- public void setWaitMillisecs(long millis) {
+ public void setWaitMillisecond(long millis) {
waitTimeout = millis;
}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
deleted file mode 100644
index cf1f796c4..000000000
---
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.reporpter;
-
-import com.google.gson.Gson;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.util.EntityUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AbstractReporter<T> {
-
- public static final Logger LOGGER =
LoggerFactory.getLogger(AbstractReporter.class);
-
- public static final String AGENT_HTTP_APPLICATION_JSON =
"application/json";
-
- private static final Gson gson = new Gson();
-
- private static final int DEFAULT_CORE_POOL_SIZE = 1;
-
- private static final int DEFAULT_MAX_POOL_SIZE = 2;
-
- private static final int DEFAULT_SYNC_SEND_QUEUE_SIZE = 10000;
-
- private static CloseableHttpClient httpClient;
-
- private final Class<?> clazz = Response.class;
-
- private ThreadPoolExecutor pool;
-
- private String serverUrl;
-
- public AbstractReporter(String serverUrl) {
- this(serverUrl, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAX_POOL_SIZE,
- DEFAULT_SYNC_SEND_QUEUE_SIZE, null);
- }
-
- public AbstractReporter(CloseableHttpClient httpClient, String serverUrl) {
- this(httpClient, serverUrl, DEFAULT_CORE_POOL_SIZE,
DEFAULT_MAX_POOL_SIZE,
- DEFAULT_SYNC_SEND_QUEUE_SIZE, null);
- }
-
- public AbstractReporter(String serverUrl, int corePoolSize, int
maximumPoolsize,
- int syncSendQueueSize,
- RejectedExecutionHandler rejectedExecutionHandler) {
- this.serverUrl = serverUrl;
- if (httpClient == null) {
- RequestConfig requestConfig = RequestConfig.custom().build();
- HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
- httpClientBuilder.setDefaultRequestConfig(requestConfig);
- httpClient = httpClientBuilder.build();
- }
- if (rejectedExecutionHandler == null) {
- rejectedExecutionHandler = new
ThreadPoolExecutor.DiscardOldestPolicy();
- }
- pool = new ThreadPoolExecutor(corePoolSize, maximumPoolsize,
- 60, TimeUnit.SECONDS, new
LinkedBlockingDeque<>(syncSendQueueSize),
- Executors.defaultThreadFactory(), rejectedExecutionHandler);
- }
-
- public AbstractReporter(CloseableHttpClient httpClient, String serverUrl,
int corePoolSize,
- int maximumPoolSize,
- int syncSendQueueSize,
- RejectedExecutionHandler rejectedExecutionHandler) {
- this(serverUrl, corePoolSize, maximumPoolSize, syncSendQueueSize,
rejectedExecutionHandler);
- this.httpClient = httpClient;
- }
-
- /**
- * Report data by sync.
- */
- public Response syncReportData(T data, String serverUrl) throws Exception {
- if (StringUtils.isEmpty(serverUrl)) {
- LOGGER.warn("Report config log server url is empty, so config log
can not be "
- + "reported!");
- return null;
- }
- HttpPost httpPost = new HttpPost(serverUrl);
- String returnStr = null;
- try {
- StringEntity stringEntity = new StringEntity(gson.toJson(data));
- stringEntity.setContentType(AGENT_HTTP_APPLICATION_JSON);
- httpPost.setEntity(stringEntity);
- returnStr = executeHttpPost(httpPost);
- return parse(returnStr);
- } catch (Exception e) {
- LOGGER.error("syncReportData has exception returnStr = {}, e:",
returnStr, e);
- throw e;
- }
- }
-
- /**
- * Report data by sync.
- */
- public Response syncReportData(T data) throws Exception {
- return this.syncReportData(data, serverUrl);
- }
-
- public String executeHttpPost(HttpPost httpPost) throws Exception {
- CloseableHttpResponse response = httpClient.execute(httpPost);
- if (response == null) {
- return null;
- }
- return EntityUtils.toString(response.getEntity());
- }
-
- /**
- * Report data by async.
- */
- public Future<Response> asyncReportData(T data, String serverUrl) {
- CompletableFuture<Response> completableFuture = new
CompletableFuture<>();
-
- if (pool != null) {
- pool.execute(new RunTask(completableFuture, data, serverUrl));
- } else {
- completableFuture.completeExceptionally(new Exception("Send pool
is null!"));
- }
-
- return completableFuture;
- }
-
- /**
- * Report data by async.
- */
- public Future<Response> asyncReportData(T data) {
- return asyncReportData(data, serverUrl);
- }
-
- /**
- * Parse json data.
- */
- public Response parse(String json) throws Exception {
-
- if (StringUtils.isEmpty(json)) {
- return null;
- }
-
- return gson.fromJson(json, Response.class);
- }
-
- class RunTask implements Runnable {
-
- private CompletableFuture<Response> completableFuture;
-
- private T data;
-
- private String url;
-
- public RunTask(CompletableFuture<Response> completableFuture, T data,
String url) {
- this.completableFuture = completableFuture;
- this.data = data;
- this.url = url;
- }
-
- public void run() {
- try {
- completableFuture.complete(syncReportData(data, url));
- } catch (Exception e) {
- completableFuture.completeExceptionally(e);
- }
- }
- }
-}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ConfigLogTypeEnum.java
b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ConfigLogTypeEnum.java
deleted file mode 100644
index d7d654ad4..000000000
---
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ConfigLogTypeEnum.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.reporpter;
-
-import static java.util.Objects.requireNonNull;
-
-public enum ConfigLogTypeEnum {
-
- NORMAL(0),ERROR(1);
-
- private int type;
-
- ConfigLogTypeEnum(int type) {
- this.type = type;
- }
-
- public static ConfigLogTypeEnum getOpType(int opType) {
- requireNonNull(opType);
- switch (opType) {
- case 0:
- return NORMAL;
- case 1:
- return ERROR;
- default:
- throw new RuntimeException("config log type doesn't exist");
- }
- }
-
- public int getType() {
- return type;
- }
-}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/Response.java
b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/Response.java
deleted file mode 100644
index 4f4d1173c..000000000
---
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/Response.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.reporpter;
-
-import lombok.Getter;
-import lombok.Setter;
-
-@Setter
-@Getter
-public class Response {
- private boolean success;
- private String errMsg;
- private String data;
-}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogMetric.java
b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogMetric.java
deleted file mode 100644
index 364cb59a7..000000000
---
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogMetric.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.reporpter;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.time.Instant;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.inlong.common.reporpter.dto.StreamConfigLogInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamConfigLogMetric implements Runnable {
-
- public static final Logger LOGGER =
LoggerFactory.getLogger(StreamConfigLogMetric.class);
-
- /*
- * config name for log report
- */
- public static final String CONFIG_LOG_REPORT_ENABLE =
"report.config.log.enable";
- public static final String CONFIG_LOG_REPORT_SERVER_URL =
"report.config.log.server.url";
- public static final String CONFIG_LOG_REPORT_INTERVAL =
"report.config.log.interval";
- public static final String CONFIG_LOG_REPORT_CLIENT_VERSION =
"report.config.log.client.version";
- public static final String CONFIG_LOG_PULSAR_PRODUCER = "pulsar-producer";
- public static final String CONFIG_LOG_PULSAR_CLIENT = "pulsar-client";
-
- private StreamConfigLogReporter streamConfigLogReporter;
-
- private String moduleName;
-
- private String clientVersion;
-
- private String localIp;
-
- private long reportInterval;
-
- public ConcurrentHashMap<String, StreamConfigLogInfo> dataCacheMap = new
ConcurrentHashMap<>();
-
- private static ScheduledExecutorService statExecutor =
- Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
- .setNameFormat("StreamConfigLogMetric-Report")
- .setUncaughtExceptionHandler((t, e) ->
- LOGGER.error(t.getName() + " has an uncaught
exception: ", e))
- .build());
-
- public StreamConfigLogMetric(String moduleName, String serverUrl, long
reportInterval,
- String localIp, String clientVersion) {
- this.streamConfigLogReporter = new StreamConfigLogReporter(serverUrl);
- this.reportInterval = reportInterval;
- this.moduleName = moduleName;
- this.localIp = localIp;
- this.clientVersion = clientVersion;
- statExecutor.scheduleWithFixedDelay(this,
- reportInterval, reportInterval, TimeUnit.MILLISECONDS);
- }
-
- /**
- * updateConfigLog
- * @param inlongGroupId inlongGroupId
- * @param inlongStreamId inlongStreamId
- * @param configName configName
- * @param configLogTypeEnum configLogTypeEnum
- * @param log log
- */
- public void updateConfigLog(String inlongGroupId, String inlongStreamId,
String configName,
- ConfigLogTypeEnum configLogTypeEnum, String log) {
- String key = moduleName + "-" + inlongGroupId + "-" + inlongStreamId
+ "-" + configName;
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("updateConfigLog key = {}", key);
- }
- dataCacheMap.compute(key, (k, v) -> {
- if (v == null) {
- v = new StreamConfigLogInfo();
- }
- updateDataValue(v, inlongGroupId,
- inlongStreamId, configName, configLogTypeEnum, log);
- return v;
- });
- }
-
- /**
- * Update value by config.
- */
- private void updateDataValue(StreamConfigLogInfo streamConfigLogInfo,
- String inlongGroupId, String inlongStreamId, String configName,
- ConfigLogTypeEnum configLogTypeEnum, String log) {
- streamConfigLogInfo.setComponentName(moduleName);
- streamConfigLogInfo.setConfigName(configName);
- streamConfigLogInfo.setInlongGroupId(inlongGroupId);
- streamConfigLogInfo.setInlongStreamId(inlongStreamId);
- streamConfigLogInfo.setIp(localIp);
- streamConfigLogInfo.setVersion(clientVersion);
- streamConfigLogInfo.setLogInfo(log);
- streamConfigLogInfo.setReportTime(Instant.now().toEpochMilli());
- streamConfigLogInfo.setLogType(configLogTypeEnum.getType());
- }
-
- /**
- * Logic entrance of Metric.
- */
- public void run() {
- try {
- Set<Entry<String, StreamConfigLogInfo>> set =
dataCacheMap.entrySet();
- long currentTimeMills = Instant.now().toEpochMilli();
- for (Entry<String, StreamConfigLogInfo> entry : set) {
- StreamConfigLogInfo streamConfigLogInfo = entry.getValue();
- if ((currentTimeMills - streamConfigLogInfo.getReportTime()) <
reportInterval) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Report metric data config key = {}!",
streamConfigLogInfo.getConfigName());
- }
-
streamConfigLogReporter.asyncReportData(streamConfigLogInfo);
- } else {
- dataCacheMap.remove(entry.getKey());
- LOGGER.info("Remove expired config key {}",
entry.getKey());
- }
- }
- } catch (Exception e) {
- LOGGER.error("Report streamConfigLogMetric has exception = {}", e);
- }
- }
-}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java
b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java
deleted file mode 100644
index 14009a727..000000000
---
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.reporpter;
-
-import java.util.concurrent.RejectedExecutionHandler;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.inlong.common.reporpter.dto.StreamConfigLogInfo;
-
-public class StreamConfigLogReporter extends
AbstractReporter<StreamConfigLogInfo> {
-
- public StreamConfigLogReporter(String serverUrl) {
- super(serverUrl);
- }
-
- public StreamConfigLogReporter(CloseableHttpClient httpClient, String
serverUrl) {
- super(httpClient, serverUrl);
- }
-
- public StreamConfigLogReporter(String serverUrl, int corePoolSize, int
maximumPoolsize,
- int syncSendQueueSize,
- RejectedExecutionHandler rejectedExecutionHandler) {
- super(serverUrl, corePoolSize, maximumPoolsize, syncSendQueueSize,
- rejectedExecutionHandler);
- }
-}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java
b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java
deleted file mode 100644
index 3ce50e135..000000000
---
a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.reporpter.dto;
-
-import lombok.Getter;
-import lombok.Setter;
-
-@Setter
-@Getter
-public class StreamConfigLogInfo {
-
- private String ip;
-
- private String version;
-
- private String componentName;
-
- private String configName;
-
- private Integer logType;
-
- private long reportTime;
-
- private String logInfo;
-
- private String inlongGroupId;
-
- private String inlongStreamId;
-
-}
diff --git
a/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
b/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
deleted file mode 100644
index eadbea2d6..000000000
---
a/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.metric.reporter;
-
-import static org.mockito.Mockito.mock;
-
-import java.util.concurrent.Future;
-import org.apache.inlong.common.reporpter.Response;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.inlong.common.reporpter.StreamConfigLogReporter;
-import org.apache.inlong.common.reporpter.dto.StreamConfigLogInfo;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class ReporterTest {
-
- @Test
- public void streamConfigLogReporterTest() throws Exception {
- String serverUrl =
"http://127.0.0.1:8083/api/inlong/manager/openapi/stream/log"
- + "/reportConfigLogStatus";
- CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
- StreamConfigLogReporter streamConfigLogReporter = new
StreamConfigLogReporter(httpClient,
- serverUrl);
- StreamConfigLogReporter spy = Mockito.spy(streamConfigLogReporter);
- StreamConfigLogInfo info = new StreamConfigLogInfo();
- Future<Response> future = spy.asyncReportData(info, serverUrl);
- Assert.assertEquals(future.get(),null);
- }
-}
diff --git a/inlong-dataproxy/conf/common.properties
b/inlong-dataproxy/conf/common.properties
index 4639c54e0..8ec3857dc 100644
--- a/inlong-dataproxy/conf/common.properties
+++ b/inlong-dataproxy/conf/common.properties
@@ -35,8 +35,3 @@ prometheusHttpPort=9081
audit.enable=true
# audit proxy address
audit.proxys=127.0.0.1:10081
-
-# report config log
-report.config.log.enable=true
-report.config.log.server.url=http://127.0.0.1:8083/api/inlong/manager/openapi/stream/log/reportConfigLogStatus
-report.config.log.interval=60000
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 380826649..2c3dc712a 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -25,7 +25,6 @@ import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.handler.codec.TooLongFrameException;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
@@ -34,12 +33,10 @@ import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
-import org.apache.inlong.common.enums.ComponentTypeEnum;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.monitor.MonitorIndex;
import org.apache.inlong.common.monitor.MonitorIndexExt;
-import org.apache.inlong.common.reporpter.StreamConfigLogMetric;
import org.apache.inlong.dataproxy.base.HighPriorityThreadFactory;
import org.apache.inlong.dataproxy.base.OrderEvent;
import org.apache.inlong.dataproxy.config.ConfigManager;
@@ -64,6 +61,7 @@ import
org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedExcepti
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -71,134 +69,117 @@ import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
- * use pulsarSink need adding such config, if these ara not config in
dataproxy-pulsar.conf,
+ * Use pulsarSink need adding such config, if these ara not config in
dataproxy-pulsar.conf,
* PulsarSink will use default value.
- * prefix of pulsar sink config in flume.conf like this XXX.sinks.XXX.property
- * and properties are may these Configurations:
- * type (*): value must be 'org.apache.inlong.dataproxy.sink.PulsarSink'
- * pulsar_server_url_list (*): value is pulsar broker url , like this
'pulsar://127.0.0.1:6650'
- * send_timeout_MILL: send message timeout, unit is millisecond, default
value is 30000 (mean 30s)
- * stat_interval_sec: stat info will be made period time , unit is second,
default value is 60s
- * thread-num: sink thread num. default value is 8
- * client-id-cache: whether use cache in client, default value is true
- * max_pending_messages: default value is 10000
- * max_batching_messages: default value is 1000
- * enable_batch: default is true
- * block_if_queue_full: default is true
+ * <p/>
+ *
+ * Prefix of Pulsar sink config in flume.conf like this XXX.sinks.XXX.property
and properties are may these
+ * configurations:
+ * <p/>
+ * <code>type</code> (*): value must be
'org.apache.inlong.dataproxy.sink.PulsarSink'
+ * <p/>
+ * <code>pulsar_server_url_list</code> (*): value is pulsar broker url, like
'pulsar://127.0.0.1:6650'
+ * <p/>
+ * <code>send_timeout_MILL</code>: send message timeout, unit is millisecond,
default is 30000 (mean 30s)
+ * <p/>
+ * <code>stat_interval_sec</code>: stat info will be made period time, unit is
second, default is 60s
+ * <p/>
+ * <code>thread-num</code>: sink thread num, default is 8
+ * <p/>
+ * <code>client-id-cache</code>: whether the client uses cache, default is true
+ * <p/>
+ * <code>max_pending_messages</code>: default is 10000
+ * <p/>
+ * <code>max_batching_messages</code>: default is 1000
+ * <p/>
+ * <code>enable_batch</code>: default is true
+ * <p/>
+ * <code>block_if_queue_full</code>: default is true
*/
-public class PulsarSink extends AbstractSink implements Configurable,
- SendMessageCallBack, CreatePulsarClientCallBack {
+public class PulsarSink extends AbstractSink implements Configurable,
SendMessageCallBack, CreatePulsarClientCallBack {
private static final Logger logger =
LoggerFactory.getLogger(PulsarSink.class);
-
- /*
- * properties for header info
+ /**
+ * log tools
*/
- private static String TOPIC = "topic";
+ private static final LogCounter logPrinterB = new LogCounter(10, 100000,
60 * 1000);
+ private static final LogCounter logPrinterC = new LogCounter(10, 100000,
60 * 1000);
+ private static final String SEPARATOR = "#";
+ private static final Long PRINT_INTERVAL = 30L;
+ private static final PulsarPerformanceTask PULSAR_PERFORMANCE_TASK = new
PulsarPerformanceTask();
+ private static final LoadingCache<String, Long> AGENT_ID_CACHE =
CacheBuilder.newBuilder()
+ .concurrencyLevel(4 *
8).initialCapacity(500).expireAfterAccess(30, TimeUnit.SECONDS)
+ .build(new CacheLoader<String, Long>() {
+ @Nonnull
+ @Override
+ public Long load(@Nonnull String key) {
+ return System.currentTimeMillis();
+ }
+ });
/*
- * for log
+ * properties for header info
*/
- private RateLimiter diskRateLimiter;
-
+ private static final String TOPIC = "topic";
/*
* for stat
*/
- private static AtomicLong totalPulsarSuccSendCnt = new AtomicLong(0);
- private static AtomicLong totalPulsarSuccSendSize = new AtomicLong(0);
- private AtomicLong currentSuccessSendCnt = new AtomicLong(0);
- private AtomicLong lastSuccessSendCnt = new AtomicLong(0);
- private long t1 = System.currentTimeMillis();
- private long t2 = 0L;
-
- private AtomicInteger processIndex = new AtomicInteger(0);
+ private static final AtomicLong TOTAL_PULSAR_SUCC_SEND_CNT = new
AtomicLong(0);
+ private static final AtomicLong TOTAL_PULSAR_SUCC_SEND_SIZE = new
AtomicLong(0);
+ private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE =
Executors
+ .newScheduledThreadPool(1, new
HighPriorityThreadFactory("pulsarPerformance-Printer-thread"));
- private int maxMonitorCnt = 300000;
+ static {
+
SCHEDULED_EXECUTOR_SERVICE.scheduleWithFixedDelay(PULSAR_PERFORMANCE_TASK, 0L,
+ PRINT_INTERVAL, TimeUnit.SECONDS);
+ logger.info("success to start pulsar performance task");
+ }
private final AtomicLong currentInFlightCount = new AtomicLong(0);
+ private final AtomicLong currentSuccessSendCnt = new AtomicLong(0);
+ private final AtomicLong lastSuccessSendCnt = new AtomicLong(0);
+ private final AtomicInteger processIndex = new AtomicInteger(0);
+ private RateLimiter diskRateLimiter;
+ private long t1 = System.currentTimeMillis();
+ private int maxMonitorCnt = 300000;
/*
* Control whether the SinkRunner thread can read data from the Channel
*/
private volatile boolean canTake = false;
-
- /*
- * log tools
- */
- private static final LogCounter logPrinterB = new LogCounter(10, 100000,
60 * 1000);
- private static final LogCounter logPrinterC = new LogCounter(10, 100000,
60 * 1000);
private SinkCounter sinkCounter;
-
/*
* message queue and retry
*/
private int eventQueueSize = 10000;
private int badEventQueueSize = 10000;
private int maxRetrySendCnt = 16;
-
/*
* send thread pool
*/
private SinkTask[] sinkThreadPool;
private int sinkThreadPoolSize;
private PulsarClientService pulsarClientService;
-
- private static final String SEPARATOR = "#";
-
-
/*
* statistic info log
*/
private MonitorIndex monitorIndex;
private MonitorIndexExt monitorIndexExt;
- /*
- * report error log
- */
- private StreamConfigLogMetric streamConfigLogMetric;
- private String localIp;
-
/*
* metric
*/
private Map<String, String> dimensions;
private DataProxyMetricItemSet metricItemSet;
-
private ConfigManager configManager;
- private Map<String, String> commonProperties;
private Map<String, String> topicProperties;
-
private Map<String, String> pulsarCluster;
private MQClusterConfig pulsarConfig;
- private static final Long PRINT_INTERVAL = 30L;
-
- private static final PulsarPerformanceTask pulsarPerformanceTask = new
PulsarPerformanceTask();
-
- private static ScheduledExecutorService scheduledExecutorService =
Executors
- .newScheduledThreadPool(1, new
HighPriorityThreadFactory("pulsarPerformance-Printer-thread"));
-
- private static final LoadingCache<String, Long> agentIdCache =
CacheBuilder.newBuilder()
- .concurrencyLevel(4 *
8).initialCapacity(500).expireAfterAccess(30, TimeUnit.SECONDS)
- .build(new CacheLoader<String, Long>() {
- @Override
- public Long load(String key) {
- return System.currentTimeMillis();
- }
- });
-
- static {
- /*
- * stat pulsar performance
- */
- logger.info("pulsarPerformanceTask!!!!!!");
- scheduledExecutorService.scheduleWithFixedDelay(pulsarPerformanceTask,
0L,
- PRINT_INTERVAL, TimeUnit.SECONDS);
- }
-
public PulsarSink() {
super();
logger.debug("new instance of PulsarSink!");
@@ -206,7 +187,6 @@ public class PulsarSink extends AbstractSink implements
Configurable,
/**
* configure
- * @param context
*/
@Override
public void configure(Context context) {
@@ -217,35 +197,18 @@ public class PulsarSink extends AbstractSink implements
Configurable,
topicProperties = configManager.getTopicProperties();
pulsarCluster = configManager.getMqClusterUrl2Token();
pulsarConfig = configManager.getMqClusterConfig(); //pulsar common
config
- commonProperties = configManager.getCommonProperties();
+ Map<String, String> commonProperties =
configManager.getCommonProperties();
sinkThreadPoolSize = pulsarConfig.getThreadNum();
if (sinkThreadPoolSize <= 0) {
sinkThreadPoolSize = 1;
}
pulsarClientService = new PulsarClientService(pulsarConfig,
sinkThreadPoolSize);
- boolean enableReportConfigLog =
- Boolean.parseBoolean(commonProperties
-
.getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_ENABLE,"true"));
- localIp = NetworkUtils.getLocalIp();
- if (enableReportConfigLog) {
- String reportConfigServerUrl = commonProperties
-
.getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_SERVER_URL, "");
- String reportConfigLogInterval = commonProperties
-
.getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_INTERVAL, "60000");
- String clientVersion = commonProperties
-
.getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_CLIENT_VERSION, "");
- streamConfigLogMetric = new
StreamConfigLogMetric(ComponentTypeEnum.DataProxy.getName(),
- reportConfigServerUrl,
Long.parseLong(reportConfigLogInterval),
- localIp, clientVersion);
- pulsarClientService.setConfigLogMetric(streamConfigLogMetric);
- }
configManager.getTopicConfig().addUpdateCallback(new
ConfigUpdateCallback() {
@Override
public void update() {
if (pulsarClientService != null) {
- diffSetPublish(pulsarClientService,
- new HashSet<>(topicProperties.values()),
+ diffSetPublish(pulsarClientService, new
HashSet<>(topicProperties.values()),
new
HashSet<>(configManager.getTopicProperties().values()));
}
}
@@ -254,8 +217,7 @@ public class PulsarSink extends AbstractSink implements
Configurable,
@Override
public void update() {
if (pulsarClientService != null) {
- diffUpdatePulsarClient(pulsarClientService, pulsarCluster,
- configManager.getMqClusterUrl2Token());
+ diffUpdatePulsarClient(pulsarClientService, pulsarCluster,
configManager.getMqClusterUrl2Token());
}
}
});
@@ -280,19 +242,14 @@ public class PulsarSink extends AbstractSink implements
Configurable,
pulsarClientService.initTopicProducer(topic);
}
}
- logger.info(getName() + " initTopicSet cost: "
- + (System.currentTimeMillis() - startTime) + "ms");
- logger.info(getName() + " producer is ready for topics : "
- + pulsarClientService.getProducerInfoMap().keySet());
+ logger.info(getName() + " initTopicSet cost: " +
(System.currentTimeMillis() - startTime) + "ms");
+ logger.info(getName() + " producer is ready for topics: " +
pulsarClientService.getProducerInfoMap().keySet());
}
/**
* When topic.properties is re-enabled, the producer update is triggered
- * @param originalSet
- * @param endSet
*/
- public void diffSetPublish(PulsarClientService pulsarClientService,
Set<String> originalSet,
- Set<String> endSet) {
+ public void diffSetPublish(PulsarClientService pulsarClientService,
Set<String> originalSet, Set<String> endSet) {
boolean changed = false;
for (String s : endSet) {
if (!originalSet.contains(s)) {
@@ -300,7 +257,7 @@ public class PulsarSink extends AbstractSink implements
Configurable,
try {
pulsarClientService.initTopicProducer(s);
} catch (Exception e) {
- logger.error("Get producer failed!", e);
+ logger.error("get producer failed: ", e);
}
}
}
@@ -312,22 +269,17 @@ public class PulsarSink extends AbstractSink implements
Configurable,
/**
* When pulsarURLList change, close and restart
- *
- * @param originalCluster
- * @param endCluster
*/
public void diffUpdatePulsarClient(PulsarClientService
pulsarClientService, Map<String, String> originalCluster,
- Map<String, String> endCluster) {
+ Map<String, String> endCluster) {
MapDifference<String, String> mapDifference =
Maps.difference(originalCluster, endCluster);
if (mapDifference.areEqual()) {
return;
}
logger.info("pulsarConfig has changed, close unused url clients and
start new url clients");
- Map<String, String> needToStart = new HashMap<>();
- Map<String, String> needToClose = new HashMap<>();
- needToClose.putAll(mapDifference.entriesOnlyOnLeft());
- needToStart.putAll(mapDifference.entriesOnlyOnRight());
+ Map<String, String> needToClose = new
HashMap<>(mapDifference.entriesOnlyOnLeft());
+ Map<String, String> needToStart = new
HashMap<>(mapDifference.entriesOnlyOnRight());
Map<String, MapDifference.ValueDifference<String>> differentToken =
mapDifference.entriesDiffering();
for (String url : differentToken.keySet()) {
needToClose.put(url, originalCluster.get(url));
@@ -343,7 +295,7 @@ public class PulsarSink extends AbstractSink implements
Configurable,
@Override
public void start() {
logger.info("[{}] pulsar sink starting...", getName());
- //register metrics
+ // register metrics
this.dimensions = new HashMap<>();
this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
@@ -354,9 +306,7 @@ public class PulsarSink extends AbstractSink implements
Configurable,
int statIntervalSec = pulsarConfig.getStatIntervalSec();
Preconditions.checkArgument(statIntervalSec >= 0, "statIntervalSec
must be >= 0");
if (statIntervalSec > 0) {
- /*
- * switch for lots of metrics
- */
+ // switch for lots of metrics
monitorIndex = new MonitorIndex("Pulsar_Sink", statIntervalSec,
maxMonitorCnt);
monitorIndexExt = new MonitorIndexExt("Pulsar_Sink_monitors#" +
this.getName(),
statIntervalSec, maxMonitorCnt);
@@ -391,7 +341,7 @@ public class PulsarSink extends AbstractSink implements
Configurable,
int waitCount = 0;
while (isAllSendFinished() && waitCount++ < 10) {
try {
- Thread.currentThread().sleep(1000);
+ Thread.sleep(1000);
} catch (InterruptedException e) {
logger.info("Stop thread has been interrupt!");
break;
@@ -418,8 +368,8 @@ public class PulsarSink extends AbstractSink implements
Configurable,
}
super.stop();
- if (!scheduledExecutorService.isShutdown()) {
- scheduledExecutorService.shutdown();
+ if (!SCHEDULED_EXECUTOR_SERVICE.isShutdown()) {
+ SCHEDULED_EXECUTOR_SERVICE.shutdown();
}
sinkCounter.stop();
logger.debug("pulsar sink stopped. Metrics:{}", sinkCounter);
@@ -456,16 +406,14 @@ public class PulsarSink extends AbstractSink implements
Configurable,
+ "last long time it will cause memoryChannel full
and fileChannel write.)", getName());
tx.rollback();
// metric
- dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
- event.getHeaders().getOrDefault(TOPIC, ""));
+ dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
event.getHeaders().getOrDefault(TOPIC, ""));
DataProxyMetricItem metricItem =
this.metricItemSet.findMetricItem(dimensions);
metricItem.readFailCount.incrementAndGet();
metricItem.readFailSize.addAndGet(event.getBody().length);
} else {
tx.commit();
// metric
- dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
- event.getHeaders().getOrDefault(TOPIC, ""));
+ dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
event.getHeaders().getOrDefault(TOPIC, ""));
DataProxyMetricItem metricItem =
this.metricItemSet.findMetricItem(dimensions);
metricItem.readSuccessCount.incrementAndGet();
metricItem.readFailSize.addAndGet(event.getBody().length);
@@ -491,7 +439,7 @@ public class PulsarSink extends AbstractSink implements
Configurable,
private void editStatistic(final Event event, String keyPostfix, boolean
isOrder) {
String topic = "";
String streamId = "";
- String nodeIp = null;
+ String nodeIp;
if (event != null) {
if (event.getHeaders().containsKey(TOPIC)) {
topic = event.getHeaders().get(TOPIC);
@@ -502,17 +450,13 @@ public class PulsarSink extends AbstractSink implements
Configurable,
streamId = event.getHeaders().get(AttributeConstants.INAME);
}
- /*
- * Compatible agent
- */
+ // Compatible agent
if (event.getHeaders().containsKey("ip")) {
event.getHeaders().put(ConfigConstants.REMOTE_IP_KEY,
event.getHeaders().get("ip"));
event.getHeaders().remove("ip");
}
- /*
- * Compatible agent
- */
+ // Compatible agent
if (event.getHeaders().containsKey("time")) {
event.getHeaders().put(AttributeConstants.DATA_TIME,
event.getHeaders().get("time"));
event.getHeaders().remove("time");
@@ -521,46 +465,39 @@ public class PulsarSink extends AbstractSink implements
Configurable,
if (event.getHeaders().containsKey(ConfigConstants.REMOTE_IP_KEY))
{
nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
if
(event.getHeaders().containsKey(ConfigConstants.REMOTE_IDC_KEY)) {
-
if (nodeIp != null) {
nodeIp = nodeIp.split(":")[0];
}
- long tMsgCounterL = 1L;
- /*
- * msg counter
- */
+ long msgCounterL = 1L;
+ // msg counter
if
(event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {
- tMsgCounterL = Integer.parseInt(event.getHeaders()
- .get(ConfigConstants.MSG_COUNTER_KEY));
+ msgCounterL =
Integer.parseInt(event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
}
String orderType = "non-order";
if (isOrder) {
orderType = "order";
}
- StringBuilder newbase = new StringBuilder();
-
newbase.append(this.getName()).append(SEPARATOR).append(topic).append(SEPARATOR)
+ StringBuilder newBase = new StringBuilder();
+
newBase.append(this.getName()).append(SEPARATOR).append(topic).append(SEPARATOR)
.append(streamId).append(SEPARATOR).append(nodeIp)
.append(SEPARATOR).append(NetworkUtils.getLocalIp())
.append(SEPARATOR).append(orderType).append(SEPARATOR)
.append(event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
long messageSize = event.getBody().length;
-
if (event.getHeaders().get(ConfigConstants.TOTAL_LEN) !=
null) {
messageSize =
Long.parseLong(event.getHeaders().get(ConfigConstants.TOTAL_LEN));
}
if (keyPostfix != null && !keyPostfix.equals("")) {
- monitorIndex.addAndGet(new String(newbase), 0, 0,
- 0, (int) tMsgCounterL);
+ monitorIndex.addAndGet(new String(newBase), 0, 0, 0,
(int) msgCounterL);
if (logPrinterB.shouldPrint()) {
logger.warn("error cannot send event, {} event
size is {}", topic, messageSize);
}
} else {
- monitorIndex.addAndGet(new String(newbase), (int)
tMsgCounterL,
- 1, messageSize, 0);
+ monitorIndex.addAndGet(new String(newBase), (int)
msgCounterL, 1, messageSize, 0);
}
}
}
@@ -584,8 +521,8 @@ public class PulsarSink extends AbstractSink implements
Configurable,
/*
* Statistics pulsar performance
*/
- totalPulsarSuccSendCnt.incrementAndGet();
-
totalPulsarSuccSendSize.addAndGet(eventStat.getEvent().getBody().length);
+ TOTAL_PULSAR_SUCC_SEND_CNT.incrementAndGet();
+
TOTAL_PULSAR_SUCC_SEND_SIZE.addAndGet(eventStat.getEvent().getBody().length);
/*
*add to sinkCounter
*/
@@ -599,15 +536,12 @@ public class PulsarSink extends AbstractSink implements
Configurable,
if (nowCnt % logEveryNEvents == 0 && nowCnt !=
lastSuccessSendCnt.get()) {
lastSuccessSendCnt.set(nowCnt);
- t2 = System.currentTimeMillis();
- logger.info("Pulsar sink {}, succ put {} events to pulsar,"
- + " in the past {} millsec", new Object[] {
- getName(), (nowCnt - oldCnt), (t2 - t1)
- });
+ long t2 = System.currentTimeMillis();
+ logger.info("Pulsar sink {}, succ put {} events to pulsar in the
past {} millsec",
+ getName(), (nowCnt - oldCnt), (t2 - t1));
t1 = t2;
}
- Map<String, String> dimensions =
getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID,
- topic);
+ Map<String, String> dimensions =
getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
DataProxyMetricItem metricItem =
this.metricItemSet.findMetricItem(dimensions);
metricItem.sendSuccessCount.incrementAndGet();
metricItem.sendSuccessSize.addAndGet(eventStat.getEvent().getBody().length);
@@ -623,24 +557,23 @@ public class PulsarSink extends AbstractSink implements
Configurable,
monitorIndexExt.incrementAndGet("PULSAR_SINK_EXP");
boolean needRetry = true;
if (e instanceof NotFoundException) {
- logger.error("NotFoundException topic {}, message will be
discard!", topic, e);
+ logger.error("NotFoundException for topic " + topic + ", message
will be discard!", e);
needRetry = false;
} else if (e instanceof TooLongFrameException) {
- logger.error("Send failed TooLongFrameException! {}{}", getName(),
e);
+ logger.error("TooLongFrameException, send failed for " +
getName(), e);
} else if (e instanceof ProducerQueueIsFullError) {
- logger.error("Send failed ProducerQueueIsFullError! {}{}",
getName(), e);
+ logger.error("ProducerQueueIsFullError, send failed for " +
getName(), e);
} else if (!(e instanceof AlreadyClosedException
|| e instanceof PulsarClientException.NotConnectedException
|| e instanceof TopicTerminatedException)) {
if (logPrinterB.shouldPrint()) {
- logger.error("Send failed!{}{}", getName(), e);
+ logger.error("send failed for " + getName(), e);
}
if (eventStat.getRetryCnt() == 0) {
editStatistic(eventStat.getEvent(), "failure",
eventStat.isOrderMessage());
}
}
- Map<String, String> dimensions =
getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID,
- topic);
+ Map<String, String> dimensions =
getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
DataProxyMetricItem metricItem =
this.metricItemSet.findMetricItem(dimensions);
metricItem.sendFailCount.incrementAndGet();
metricItem.sendFailSize.addAndGet(eventStat.getEvent().getBody().length);
@@ -650,8 +583,8 @@ public class PulsarSink extends AbstractSink implements
Configurable,
}
}
- private Map getNewDimension(String otherKey, String value) {
- Map dimensions = new HashMap<>();
+ private Map<String, String> getNewDimension(String otherKey, String value)
{
+ Map<String, String> dimensions = new HashMap<>();
dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
dimensions.put(otherKey, value);
@@ -659,10 +592,11 @@ public class PulsarSink extends AbstractSink implements
Configurable,
}
private boolean processEvent(EventStat eventStat) {
- boolean result = true;
if (eventStat == null || eventStat.getEvent() == null) {
- return result;
+ return true;
}
+
+ boolean result = true;
Event event = eventStat.getEvent();
if (MessageUtils.isSyncSendForOrder(event) && (event instanceof
OrderEvent)) {
String partitionKey =
event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY);
@@ -701,8 +635,8 @@ public class PulsarSink extends AbstractSink implements
Configurable,
*/
if (pulsarConfig.getClientIdCache()) {
String clientId =
eventStat.getEvent().getHeaders().get(ConfigConstants.SEQUENCE_ID);
- if (clientId != null &&
agentIdCache.asMap().containsKey(clientId)) {
- agentIdCache.invalidate(clientId);
+ if (clientId != null &&
AGENT_ID_CACHE.asMap().containsKey(clientId)) {
+ AGENT_ID_CACHE.invalidate(clientId);
}
}
boolean result = false;
@@ -737,7 +671,7 @@ public class PulsarSink extends AbstractSink implements
Configurable,
}
public LoadingCache<String, Long> getAgentIdCache() {
- return agentIdCache;
+ return AGENT_ID_CACHE;
}
public Map<String, String> getTopicsProperties() {
@@ -761,20 +695,19 @@ public class PulsarSink extends AbstractSink implements
Configurable,
}
static class PulsarPerformanceTask implements Runnable {
+
@Override
public void run() {
try {
- if (totalPulsarSuccSendSize.get() != 0) {
+ if (TOTAL_PULSAR_SUCC_SEND_SIZE.get() != 0) {
logger.info("Total pulsar performance tps :"
- + totalPulsarSuccSendCnt.get() / PRINT_INTERVAL
+ + TOTAL_PULSAR_SUCC_SEND_CNT.get() / PRINT_INTERVAL
+ "/s, avg msg size:"
- + totalPulsarSuccSendSize.get() /
totalPulsarSuccSendCnt.get()
+ + TOTAL_PULSAR_SUCC_SEND_SIZE.get() /
TOTAL_PULSAR_SUCC_SEND_CNT.get()
+ ",print every " + PRINT_INTERVAL + " seconds");
- /*
- * totalpulsarSuccSendCnt represents the number of packets
- */
- totalPulsarSuccSendCnt.set(0);
- totalPulsarSuccSendSize.set(0);
+ // totalPulsarSuccSendCnt represents the number of packets
+ TOTAL_PULSAR_SUCC_SEND_CNT.set(0);
+ TOTAL_PULSAR_SUCC_SEND_SIZE.set(0);
}
} catch (Exception e) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index ad00702d7..28af3fa1c 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -22,8 +22,6 @@ import io.netty.buffer.ByteBuf;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
-import org.apache.inlong.common.reporpter.ConfigLogTypeEnum;
-import org.apache.inlong.common.reporpter.StreamConfigLogMetric;
import org.apache.inlong.dataproxy.base.OrderEvent;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
@@ -42,7 +40,6 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
-import org.apache.pulsar.client.impl.MessageIdImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,7 +84,6 @@ public class PulsarClientService {
private long retryIntervalWhenSendMsgError = 30 * 1000L;
private String localIp = "127.0.0.1";
- private StreamConfigLogMetric streamConfigLogMetric;
private int sinkThreadPoolSize;
/**
@@ -127,10 +123,6 @@ public class PulsarClientService {
localIp = NetworkUtils.getLocalIp();
}
- public void setConfigLogMetric(StreamConfigLogMetric
streamConfigLogMetric) {
- this.streamConfigLogMetric = streamConfigLogMetric;
- }
-
public void initCreateConnection(CreatePulsarClientCallBack callBack) {
pulsarUrl2token = ConfigManager.getInstance().getMqClusterUrl2Token();
if (pulsarUrl2token == null || pulsarUrl2token.isEmpty()) {
@@ -151,19 +143,13 @@ public class PulsarClientService {
public boolean sendMessage(int poolIndex, String topic, Event event,
SendMessageCallBack sendMessageCallBack, EventStat es) {
TopicProducerInfo producerInfo = null;
- boolean result = false;
+ boolean result;
final String inlongStreamId = getInlongStreamId(event);
final String inlongGroupId = getInlongGroupId(event);
try {
producerInfo = getProducerInfo(poolIndex, topic, inlongGroupId,
inlongStreamId);
} catch (Exception e) {
- producerInfo = null;
- logger.error("get producer failed! topic = " + topic, e);
- if (streamConfigLogMetric != null) {
- streamConfigLogMetric.updateConfigLog(inlongGroupId,
- inlongStreamId,
StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
- ConfigLogTypeEnum.ERROR, e.toString());
- }
+ logger.error("get producer failed for topic=" + topic, e);
}
/*
* If the producer is a null value,\ it means that the topic is not yet
@@ -196,25 +182,19 @@ public class PulsarClientService {
if (es.isOrderMessage()) {
String partitionKey =
event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY);
try {
- MessageId msgId = producer.newMessage().key(partitionKey)
- .properties(proMap).value(event.getBody())
+ MessageId msgId = producer.newMessage()
+ .properties(proMap)
+ .key(partitionKey)
+ .value(event.getBody())
.send();
sendMessageCallBack.handleMessageSendSuccess(topic, msgId, es);
AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS,
event);
forCallBackP.setCanUseSend(true);
result = true;
} catch (PulsarClientException ex) {
- if (streamConfigLogMetric != null) {
- streamConfigLogMetric.updateConfigLog(inlongGroupId,
- inlongStreamId,
StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
- ConfigLogTypeEnum.ERROR, ex.toString());
- }
forCallBackP.setCanUseSend(false);
sendMessageCallBack.handleMessageSendException(topic, es, ex);
- result = false;
- if (ex instanceof NotFoundException) {
- result = true;
- }
+ result = ex instanceof NotFoundException;
}
/*
* avoid client timeout
@@ -224,17 +204,15 @@ public class PulsarClientService {
sendResponse((OrderEvent) event, inlongGroupId,
inlongStreamId);
}
} else {
- producer.newMessage().properties(proMap).value(event.getBody())
- .sendAsync().thenAccept((msgId) -> {
+ producer.newMessage().properties(proMap)
+ .value(event.getBody())
+ .sendAsync()
+ .thenAccept((msgId) -> {
AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
forCallBackP.setCanUseSend(true);
- sendMessageCallBack.handleMessageSendSuccess(topic,
(MessageIdImpl) msgId, es);
- }).exceptionally((e) -> {
- if (streamConfigLogMetric != null) {
-
streamConfigLogMetric.updateConfigLog(inlongGroupId,
- inlongStreamId,
StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
- ConfigLogTypeEnum.ERROR, e.toString());
- }
+ sendMessageCallBack.handleMessageSendSuccess(topic,
msgId, es);
+ })
+ .exceptionally((e) -> {
forCallBackP.setCanUseSend(false);
sendMessageCallBack.handleMessageSendException(topic,
es, e);
return null;
@@ -297,12 +275,6 @@ public class PulsarClientService {
callBack.handleCreateClientSuccess(info.getKey());
} catch (PulsarClientException e) {
callBack.handleCreateClientException(info.getKey());
- if (streamConfigLogMetric != null) {
- streamConfigLogMetric.updateConfigLog("DataProxyGlobal",
- "DataProxyGlobal",
- StreamConfigLogMetric.CONFIG_LOG_PULSAR_CLIENT,
- ConfigLogTypeEnum.ERROR, e.toString());
- }
logger.error("create connection error in Pulsar sink, "
+ "maybe pulsar master set error, please re-check. url
" + info.getKey(), e);
} catch (Throwable e) {
@@ -358,15 +330,14 @@ public class PulsarClientService {
private TopicProducerInfo getProducerInfo(int poolIndex, String topic,
String inlongGroupId,
String inlongStreamId) {
- List<TopicProducerInfo> producerList =
- initTopicProducer(topic, inlongGroupId, inlongStreamId);
+ List<TopicProducerInfo> producerList = initTopicProducer(topic,
inlongGroupId, inlongStreamId);
AtomicLong topicIndex = topicSendIndexMap.computeIfAbsent(topic, (k)
-> new AtomicLong(0));
int maxTryToGetProducer = producerList == null ? 0 :
producerList.size();
if (maxTryToGetProducer == 0) {
return null;
}
int retryTime = 0;
- TopicProducerInfo p = null;
+ TopicProducerInfo p;
do {
int index = (int) (topicIndex.getAndIncrement() %
maxTryToGetProducer);
p = producerList.get(index);
@@ -410,7 +381,7 @@ public class PulsarClientService {
/**
* close pulsarClients(the related url is removed); start pulsarClients
for new url, and create producers for them
*
- * @param callBack
+ * @param callBack callback
* @param needToClose url-token map
* @param needToStart url-token map
* @param topicSet for new pulsarClient, create these topics' producers
@@ -453,11 +424,11 @@ public class PulsarClientService {
} catch (PulsarClientException e) {
callBack.handleCreateClientException(url);
- logger.error("create connnection error in pulsar sink, "
+ logger.error("create connection error in pulsar sink, "
+ "maybe pulsar master set error, please re-check.url
" + url, e);
} catch (Throwable e) {
callBack.handleCreateClientException(url);
- logger.error("create connnection error in pulsar sink, "
+ logger.error("create connection error in pulsar sink, "
+ "maybe pulsar master set error/shutdown in progress,
please "
+ "re-check. url " + url, e);
}
@@ -496,16 +467,11 @@ public class PulsarClientService {
class TopicProducerInfo {
+ private final Producer[] producers;
+ private final PulsarClient pulsarClient;
+ private final int sinkThreadPoolSize;
+ private final String topic;
private long lastSendMsgErrorTime;
-
- private Producer[] producers;
-
- private PulsarClient pulsarClient;
-
- private int sinkThreadPoolSize;
-
- private String topic;
-
private volatile Boolean isCanUseSend = true;
private volatile Boolean isFinishInit = false;
@@ -536,13 +502,6 @@ public class PulsarClientService {
producers[i].closeAsync();
}
}
- if (streamConfigLogMetric != null
- && StringUtils.isNotEmpty(inlongGroupId)
- && StringUtils.isNotEmpty(inlongStreamId)) {
- streamConfigLogMetric.updateConfigLog(inlongGroupId,
- inlongStreamId,
StreamConfigLogMetric.CONFIG_LOG_PULSAR_CLIENT,
- ConfigLogTypeEnum.ERROR, e.toString());
- }
}
}