This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 78ae5ff38 [INLONG-5470][Sort] Fix compile error (#5471)
78ae5ff38 is described below
commit 78ae5ff38996c2790e2184dc840ad1a7416d26c5
Author: Charles <[email protected]>
AuthorDate: Wed Aug 10 22:29:34 2022 +0800
[INLONG-5470][Sort] Fix compile error (#5471)
---
.../inlong/sort/elasticsearch/ElasticsearchSinkBase.java | 16 +++++++---------
.../table/RowElasticsearchSinkFunction.java | 15 +++++++--------
2 files changed, 14 insertions(+), 17 deletions(-)
diff --git
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index 4dd35d060..399b629cb 100644
---
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -265,20 +265,18 @@ public abstract class ElasticsearchSinkBase<T, C extends
AutoCloseable> extends
@Override
public void open(Configuration parameters) throws Exception {
client = callBridge.createClient(userConfig);
- sinkMetricData = new
SinkMetricData(getRuntimeContext().getMetricGroup());
if (inLongMetric != null && !inLongMetric.isEmpty()) {
String[] inLongMetricArray = inLongMetric.split("&");
String groupId = inLongMetricArray[0];
String streamId = inLongMetricArray[1];
String nodeId = inLongMetricArray[2];
- sinkMetricData.registerMetricsForDirtyBytes(groupId, streamId,
nodeId, "dirtyBytes");
- sinkMetricData.registerMetricsForDirtyRecords(groupId, streamId,
nodeId, "dirtyRecords");
- sinkMetricData.registerMetricsForNumBytesOut(groupId, streamId,
nodeId, "numBytesOut");
- sinkMetricData.registerMetricsForNumRecordsOut(groupId, streamId,
nodeId, "numRecordsOut");
- sinkMetricData.registerMetricsForNumBytesOutPerSecond(groupId,
streamId, nodeId,
- "numBytesOutPerSecond");
- sinkMetricData.registerMetricsForNumRecordsOutPerSecond(groupId,
streamId, nodeId,
- "numRecordsOutPerSecond");
+ sinkMetricData = new SinkMetricData(groupId, streamId, nodeId,
getRuntimeContext().getMetricGroup());
+ sinkMetricData.registerMetricsForDirtyBytes();
+ sinkMetricData.registerMetricsForDirtyRecords();
+ sinkMetricData.registerMetricsForNumBytesOut();
+ sinkMetricData.registerMetricsForNumRecordsOut();
+ sinkMetricData.registerMetricsForNumBytesOutPerSecond();
+ sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
}
callBridge.verifyClientConnection(client);
bulkProcessor = buildBulkProcessor(new
BulkProcessorListener(sinkMetricData));
diff --git
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index 8af55faaf..d300158d9 100644
---
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -82,19 +82,18 @@ public class RowElasticsearchSinkFunction implements
ElasticsearchSinkFunction<R
public void open(RuntimeContext ctx) {
indexGenerator.open();
this.runtimeContext = ctx;
- sinkMetricData = new SinkMetricData(runtimeContext.getMetricGroup());
if (inLongMetric != null && !inLongMetric.isEmpty()) {
String[] inLongMetricArray = inLongMetric.split("&");
String groupId = inLongMetricArray[0];
String streamId = inLongMetricArray[1];
String nodeId = inLongMetricArray[2];
- sinkMetricData.registerMetricsForDirtyBytes(groupId, streamId,
nodeId, "dirtyBytes");
- sinkMetricData.registerMetricsForDirtyRecords(groupId, streamId,
nodeId, "dirtyRecords");
- sinkMetricData.registerMetricsForNumBytesOut(groupId, streamId,
nodeId, "numBytesOut");
- sinkMetricData.registerMetricsForNumRecordsOut(groupId, streamId,
nodeId, "numRecordsOut");
- sinkMetricData.registerMetricsForNumBytesOutPerSecond(groupId,
streamId, nodeId, "numBytesOutPerSecond");
- sinkMetricData.registerMetricsForNumRecordsOutPerSecond(groupId,
streamId, nodeId,
- "numRecordsOutPerSecond");
+ sinkMetricData = new SinkMetricData(groupId, streamId, nodeId,
runtimeContext.getMetricGroup());
+ sinkMetricData.registerMetricsForDirtyBytes();
+ sinkMetricData.registerMetricsForDirtyRecords();
+ sinkMetricData.registerMetricsForNumBytesOut();
+ sinkMetricData.registerMetricsForNumRecordsOut();
+ sinkMetricData.registerMetricsForNumBytesOutPerSecond();
+ sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
}
}