This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a904605ad [INLONG-6675][Sort] Add metrics.audit.proxy.hosts option to
support report metrics (#6692)
a904605ad is described below
commit a904605ade244555cd13039c49de8f395c8a4ca7
Author: wangpeix <[email protected]>
AuthorDate: Thu Dec 1 17:20:26 2022 +0800
[INLONG-6675][Sort] Add metrics.audit.proxy.hosts option to support report
metrics (#6692)
---
.../inlong/manager/plugin/flink/FlinkService.java | 28 +++-------------------
.../main/java/org/apache/inlong/sort/Entrance.java | 6 ++++-
2 files changed, 8 insertions(+), 26 deletions(-)
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 6afd7df53..2bb46ae27 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest;
@@ -196,7 +195,7 @@ public class FlinkService {
private String submitJobBySavepoint(FlinkInfo flinkInfo,
SavepointRestoreSettings settings) throws Exception {
String localJarPath = flinkInfo.getLocalJarPath();
final File jarFile = new File(localJarPath);
- final String[] programArgs = genProgramArgsV2(flinkInfo, flinkConfig);
+ final String[] programArgs = genProgramArgs(flinkInfo, flinkConfig);
List<URL> connectorJars =
flinkInfo.getConnectorJarPaths().stream().map(p -> {
try {
@@ -254,30 +253,7 @@ public class FlinkService {
/**
* Build the program of the Flink job.
*/
- @Deprecated
private String[] genProgramArgs(FlinkInfo flinkInfo, FlinkConfig
flinkConfig) {
- List<String> list = new ArrayList<>();
- list.add("-cluster-id");
- list.add(flinkInfo.getJobName());
- list.add("-dataflow.info.file");
- list.add(flinkInfo.getLocalConfPath());
- list.add("-source.type");
- list.add(flinkInfo.getSourceType());
- list.add("-sink.type");
- list.add(flinkInfo.getSinkType());
- list.add("-metrics.audit.proxy.hosts");
- list.add(flinkConfig.getAuditProxyHosts());
- // TODO Support more than one stream with one group
- if (flinkInfo.getInlongStreamInfoList() != null
- && !flinkInfo.getInlongStreamInfoList().isEmpty()) {
- InlongStreamInfo inlongStreamInfo =
flinkInfo.getInlongStreamInfoList().get(0);
- list.add("-job.orderly.output");
- list.add(String.valueOf(inlongStreamInfo.getSyncSend()));
- }
- return list.toArray(new String[0]);
- }
-
- private String[] genProgramArgsV2(FlinkInfo flinkInfo, FlinkConfig
flinkConfig) {
List<String> list = new ArrayList<>();
list.add("-cluster-id");
list.add(flinkInfo.getJobName());
@@ -285,6 +261,8 @@ public class FlinkService {
list.add(flinkInfo.getLocalConfPath());
list.add("-checkpoint.interval");
list.add("60000");
+ list.add("-metrics.audit.proxy.hosts");
+ list.add(flinkConfig.getAuditProxyHosts());
return list.toArray(new String[0]);
}
diff --git
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
index f6f91eac4..fbe01d881 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
@@ -58,6 +58,10 @@ public class Entrance {
Parser parser;
if (StringUtils.isEmpty(sqlFile)) {
final GroupInfo groupInfo =
getGroupInfoFromFile(config.getString(Constants.GROUP_INFO_FILE));
+ if
(StringUtils.isNotEmpty(config.getString(Constants.METRICS_AUDIT_PROXY_HOSTS)))
{
+
groupInfo.getProperties().putIfAbsent(Constants.METRICS_AUDIT_PROXY_HOSTS.key(),
+ config.getString(Constants.METRICS_AUDIT_PROXY_HOSTS));
+ }
parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
} else {
String statements = getStatementSetFromFile(sqlFile);
@@ -69,7 +73,7 @@ public class Entrance {
}
private static String getStatementSetFromFile(String fileName) throws
IOException {
- return Files.toString(new File(fileName), StandardCharsets.UTF_8);
+ return Files.asCharSource(new File(fileName),
StandardCharsets.UTF_8).read();
}
private static GroupInfo getGroupInfoFromFile(String fileName) throws
IOException {