This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a904605ad [INLONG-6675][Sort] Add metrics.audit.proxy.hosts option to 
support report metrics (#6692)
a904605ad is described below

commit a904605ade244555cd13039c49de8f395c8a4ca7
Author: wangpeix <[email protected]>
AuthorDate: Thu Dec 1 17:20:26 2022 +0800

    [INLONG-6675][Sort] Add metrics.audit.proxy.hosts option to support report 
metrics (#6692)
---
 .../inlong/manager/plugin/flink/FlinkService.java  | 28 +++-------------------
 .../main/java/org/apache/inlong/sort/Entrance.java |  6 ++++-
 2 files changed, 8 insertions(+), 26 deletions(-)

diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 6afd7df53..2bb46ae27 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest;
@@ -196,7 +195,7 @@ public class FlinkService {
     private String submitJobBySavepoint(FlinkInfo flinkInfo, 
SavepointRestoreSettings settings) throws Exception {
         String localJarPath = flinkInfo.getLocalJarPath();
         final File jarFile = new File(localJarPath);
-        final String[] programArgs = genProgramArgsV2(flinkInfo, flinkConfig);
+        final String[] programArgs = genProgramArgs(flinkInfo, flinkConfig);
 
         List<URL> connectorJars = 
flinkInfo.getConnectorJarPaths().stream().map(p -> {
             try {
@@ -254,30 +253,7 @@ public class FlinkService {
     /**
      * Build the program of the Flink job.
      */
-    @Deprecated
     private String[] genProgramArgs(FlinkInfo flinkInfo, FlinkConfig 
flinkConfig) {
-        List<String> list = new ArrayList<>();
-        list.add("-cluster-id");
-        list.add(flinkInfo.getJobName());
-        list.add("-dataflow.info.file");
-        list.add(flinkInfo.getLocalConfPath());
-        list.add("-source.type");
-        list.add(flinkInfo.getSourceType());
-        list.add("-sink.type");
-        list.add(flinkInfo.getSinkType());
-        list.add("-metrics.audit.proxy.hosts");
-        list.add(flinkConfig.getAuditProxyHosts());
-        // TODO Support more than one stream with one group
-        if (flinkInfo.getInlongStreamInfoList() != null
-                && !flinkInfo.getInlongStreamInfoList().isEmpty()) {
-            InlongStreamInfo inlongStreamInfo = 
flinkInfo.getInlongStreamInfoList().get(0);
-            list.add("-job.orderly.output");
-            list.add(String.valueOf(inlongStreamInfo.getSyncSend()));
-        }
-        return list.toArray(new String[0]);
-    }
-
-    private String[] genProgramArgsV2(FlinkInfo flinkInfo, FlinkConfig 
flinkConfig) {
         List<String> list = new ArrayList<>();
         list.add("-cluster-id");
         list.add(flinkInfo.getJobName());
@@ -285,6 +261,8 @@ public class FlinkService {
         list.add(flinkInfo.getLocalConfPath());
         list.add("-checkpoint.interval");
         list.add("60000");
+        list.add("-metrics.audit.proxy.hosts");
+        list.add(flinkConfig.getAuditProxyHosts());
         return list.toArray(new String[0]);
     }
 
diff --git 
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java 
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
index f6f91eac4..fbe01d881 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
@@ -58,6 +58,10 @@ public class Entrance {
         Parser parser;
         if (StringUtils.isEmpty(sqlFile)) {
             final GroupInfo groupInfo = 
getGroupInfoFromFile(config.getString(Constants.GROUP_INFO_FILE));
+            if 
(StringUtils.isNotEmpty(config.getString(Constants.METRICS_AUDIT_PROXY_HOSTS))) 
{
+                
groupInfo.getProperties().putIfAbsent(Constants.METRICS_AUDIT_PROXY_HOSTS.key(),
+                        config.getString(Constants.METRICS_AUDIT_PROXY_HOSTS));
+            }
             parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
         } else {
             String statements = getStatementSetFromFile(sqlFile);
@@ -69,7 +73,7 @@ public class Entrance {
     }
 
     private static String getStatementSetFromFile(String fileName) throws 
IOException {
-        return Files.toString(new File(fileName), StandardCharsets.UTF_8);
+        return Files.asCharSource(new File(fileName), 
StandardCharsets.UTF_8).read();
     }
 
     private static GroupInfo getGroupInfoFromFile(String fileName) throws 
IOException {

Reply via email to