This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 78ae5ff38 [INLONG-5470][Sort] Fix compile error (#5471)
78ae5ff38 is described below

commit 78ae5ff38996c2790e2184dc840ad1a7416d26c5
Author: Charles <[email protected]>
AuthorDate: Wed Aug 10 22:29:34 2022 +0800

    [INLONG-5470][Sort] Fix compile error (#5471)
---
 .../inlong/sort/elasticsearch/ElasticsearchSinkBase.java | 16 +++++++---------
 .../table/RowElasticsearchSinkFunction.java              | 15 +++++++--------
 2 files changed, 14 insertions(+), 17 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index 4dd35d060..399b629cb 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -265,20 +265,18 @@ public abstract class ElasticsearchSinkBase<T, C extends 
AutoCloseable> extends
     @Override
     public void open(Configuration parameters) throws Exception {
         client = callBridge.createClient(userConfig);
-        sinkMetricData = new 
SinkMetricData(getRuntimeContext().getMetricGroup());
         if (inLongMetric != null && !inLongMetric.isEmpty()) {
             String[] inLongMetricArray = inLongMetric.split("&");
             String groupId = inLongMetricArray[0];
             String streamId = inLongMetricArray[1];
             String nodeId = inLongMetricArray[2];
-            sinkMetricData.registerMetricsForDirtyBytes(groupId, streamId, 
nodeId, "dirtyBytes");
-            sinkMetricData.registerMetricsForDirtyRecords(groupId, streamId, 
nodeId, "dirtyRecords");
-            sinkMetricData.registerMetricsForNumBytesOut(groupId, streamId, 
nodeId, "numBytesOut");
-            sinkMetricData.registerMetricsForNumRecordsOut(groupId, streamId, 
nodeId, "numRecordsOut");
-            sinkMetricData.registerMetricsForNumBytesOutPerSecond(groupId, 
streamId, nodeId,
-                    "numBytesOutPerSecond");
-            sinkMetricData.registerMetricsForNumRecordsOutPerSecond(groupId, 
streamId, nodeId,
-                    "numRecordsOutPerSecond");
+            sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, 
getRuntimeContext().getMetricGroup());
+            sinkMetricData.registerMetricsForDirtyBytes();
+            sinkMetricData.registerMetricsForDirtyRecords();
+            sinkMetricData.registerMetricsForNumBytesOut();
+            sinkMetricData.registerMetricsForNumRecordsOut();
+            sinkMetricData.registerMetricsForNumBytesOutPerSecond();
+            sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
         }
         callBridge.verifyClientConnection(client);
         bulkProcessor = buildBulkProcessor(new 
BulkProcessorListener(sinkMetricData));
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index 8af55faaf..d300158d9 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -82,19 +82,18 @@ public class RowElasticsearchSinkFunction implements 
ElasticsearchSinkFunction<R
     public void open(RuntimeContext ctx) {
         indexGenerator.open();
         this.runtimeContext = ctx;
-        sinkMetricData = new SinkMetricData(runtimeContext.getMetricGroup());
         if (inLongMetric != null && !inLongMetric.isEmpty()) {
             String[] inLongMetricArray = inLongMetric.split("&");
             String groupId = inLongMetricArray[0];
             String streamId = inLongMetricArray[1];
             String nodeId = inLongMetricArray[2];
-            sinkMetricData.registerMetricsForDirtyBytes(groupId, streamId, 
nodeId, "dirtyBytes");
-            sinkMetricData.registerMetricsForDirtyRecords(groupId, streamId, 
nodeId, "dirtyRecords");
-            sinkMetricData.registerMetricsForNumBytesOut(groupId, streamId, 
nodeId, "numBytesOut");
-            sinkMetricData.registerMetricsForNumRecordsOut(groupId, streamId, 
nodeId, "numRecordsOut");
-            sinkMetricData.registerMetricsForNumBytesOutPerSecond(groupId, 
streamId, nodeId, "numBytesOutPerSecond");
-            sinkMetricData.registerMetricsForNumRecordsOutPerSecond(groupId, 
streamId, nodeId,
-                    "numRecordsOutPerSecond");
+            sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, 
runtimeContext.getMetricGroup());
+            sinkMetricData.registerMetricsForDirtyBytes();
+            sinkMetricData.registerMetricsForDirtyRecords();
+            sinkMetricData.registerMetricsForNumBytesOut();
+            sinkMetricData.registerMetricsForNumRecordsOut();
+            sinkMetricData.registerMetricsForNumBytesOutPerSecond();
+            sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
         }
     }
 

Reply via email to