This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1ceab44de [INLONG-5533][Agent] Support structured output in the
Kubernetes (#5534)
1ceab44de is described below
commit 1ceab44dea6257dec054999380d17d075fe8630f
Author: ganfengtan <[email protected]>
AuthorDate: Sat Aug 13 14:25:27 2022 +0800
[INLONG-5533][Agent] Support structured output in the Kubernetes (#5534)
---
.../apache/inlong/agent/constant/JobConstants.java | 3 +-
.../inlong/agent/constant/KubernetesConstants.java | 12 +++-
.../sources/reader/file/AbstractFileReader.java | 32 ++++++++-
.../sources/reader/file/FileReaderOperator.java | 3 +-
.../sources/reader/file/KubernetesFileReader.java | 76 +++++++++++++++++-----
.../FileDataUtils.java} | 36 +++++-----
.../inlong/agent/plugin/utils/MetaDataUtils.java | 67 +++++++++++++++++--
7 files changed, 187 insertions(+), 42 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index 04db5b097..d913c0257 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -54,8 +54,9 @@ public class JobConstants extends CommonConstants {
public static final String JOB_FILE_LINE_END_PATTERN =
"job.fileJob.line.endPattern";
public static final String JOB_FILE_CONTENT_COLLECT_TYPE =
"job.fileJob.contentCollectType";
public static final String JOB_FILE_META_ENV_LIST = "job.fileJob.envList";
- public static final String JOB_FILE_DATA_SOURCE_COLUMN_SEPARATOR =
"job.fileJob.dataSeparator";
public static final String JOB_FILE_META_FILTER_BY_LABELS =
"job.fileJob.filterMetaByLabels";
+ public static final String JOB_FILE_PROPERTIES = "job.fileJob.properties";
+ public static final String JOB_FILE_DATA_SOURCE_COLUMN_SEPARATOR =
"job.fileJob.dataSeparator";
//Binlog job
public static final String JOB_DATABASE_USER = "job.binlogJob.user";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
index dc76a39a9..e71ae4bae 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
@@ -18,7 +18,7 @@
package org.apache.inlong.agent.constant;
/**
- * k8s information
+ * k8s information
*/
public class KubernetesConstants {
@@ -32,5 +32,15 @@ public class KubernetesConstants {
public static final String CONTAINER_NAME = "container.name";
public static final String CONTAINER_ID = "container.id";
+ // k8s metadata
+ public static final String METADATA_CONTAINER_ID = "_container_id_";
+ public static final String METADATA_CONTAINER_NAME = "_container_name_";
+ public static final String METADATA_NAMESPACE = "_namespace_";
+ public static final String METADATA_POD_UID = "_pod_uid_";
+ public static final String METADATA_POD_NAME = "_pod_name_";
+ public static final String METADATA_POD_LABEL = "_pod_label_";
+ public static final String DATA_CONTENT = "_content_";
+ public static final String DATA_CONTENT_TIME = "_time_";
+
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
index d066ca2e7..f61cb4c28 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
@@ -17,16 +17,26 @@
package org.apache.inlong.agent.plugin.sources.reader.file;
-import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
+import com.google.gson.Gson;
+import org.apache.inlong.agent.plugin.utils.FileDataUtils;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.stream.Collectors;
+import static
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE;
+import static
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_FILTER_BY_LABELS;
+import static
org.apache.inlong.agent.constant.KubernetesConstants.DATA_CONTENT;
+import static
org.apache.inlong.agent.constant.KubernetesConstants.DATA_CONTENT_TIME;
+
/**
* File reader template
*/
public abstract class AbstractFileReader {
+ private static final Gson GSON = new Gson();
public FileReaderOperator fileReaderOperator;
public abstract void getData() throws Exception;
@@ -35,9 +45,25 @@ public abstract class AbstractFileReader {
if (null == fileReaderOperator.metadata) {
return;
}
-
List<String> lines =
fileReaderOperator.stream.collect(Collectors.toList());
- lines.forEach(data -> data = MetaDataUtils.concatString(data,
fileReaderOperator.metadata));
+ if (fileReaderOperator.jobConf.hasKey(JOB_FILE_CONTENT_COLLECT_TYPE)) {
+ long timestamp = System.currentTimeMillis();
+ if (Objects.nonNull(fileReaderOperator.metadata)) {
+ lines = lines.stream().map(data -> {
+ Map<String, String> mergeData = new
HashMap<>(fileReaderOperator.metadata);
+ mergeData.put(DATA_CONTENT,
FileDataUtils.getK8sJsonLog(data));
+ mergeData.put(DATA_CONTENT_TIME,
String.valueOf(timestamp));
+ return GSON.toJson(mergeData);
+ }).collect(Collectors.toList());
+ } else if
(!fileReaderOperator.jobConf.hasKey(JOB_FILE_META_FILTER_BY_LABELS)) {
+ lines = lines.stream().map(data -> {
+ Map<String, String> mergeData = new HashMap<>();
+ mergeData.put(DATA_CONTENT,
FileDataUtils.getK8sJsonLog(data));
+ mergeData.put(DATA_CONTENT_TIME,
String.valueOf(timestamp));
+ return GSON.toJson(mergeData);
+ }).collect(Collectors.toList());
+ }
+ }
fileReaderOperator.stream = lines.stream();
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 5f448e3c4..f3d7d91bc 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -36,6 +36,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@@ -58,7 +59,7 @@ public class FileReaderOperator extends AbstractReader {
public int position;
public String md5;
public Stream<String> stream;
- public String metadata;
+ public Map<String, String> metadata;
public JobProfile jobConf;
private Iterator<String> iterator;
private long timeout;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
index 9f0053351..04331278d 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
@@ -17,7 +17,7 @@
package org.apache.inlong.agent.plugin.sources.reader.file;
-import io.fabric8.kubernetes.api.model.ObjectMeta;
+import com.google.gson.Gson;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.Config;
@@ -26,20 +26,31 @@ import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.PodResource;
-import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.stream.Collectors;
+import static
org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_ID;
+import static
org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_NAME;
import static org.apache.inlong.agent.constant.KubernetesConstants.HTTPS;
import static
org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_HOST;
import static
org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_PORT;
+import static
org.apache.inlong.agent.constant.KubernetesConstants.METADATA_CONTAINER_ID;
+import static
org.apache.inlong.agent.constant.KubernetesConstants.METADATA_CONTAINER_NAME;
+import static
org.apache.inlong.agent.constant.KubernetesConstants.METADATA_NAMESPACE;
+import static
org.apache.inlong.agent.constant.KubernetesConstants.METADATA_POD_LABEL;
+import static
org.apache.inlong.agent.constant.KubernetesConstants.METADATA_POD_NAME;
+import static
org.apache.inlong.agent.constant.KubernetesConstants.METADATA_POD_UID;
import static org.apache.inlong.agent.constant.KubernetesConstants.NAMESPACE;
import static org.apache.inlong.agent.constant.KubernetesConstants.POD_NAME;
@@ -49,6 +60,7 @@ import static
org.apache.inlong.agent.constant.KubernetesConstants.POD_NAME;
public final class KubernetesFileReader extends AbstractFileReader {
private static final Logger log =
LoggerFactory.getLogger(KubernetesFileReader.class);
+ private static final Gson GSON = new Gson();
private KubernetesClient client;
@@ -60,13 +72,16 @@ public final class KubernetesFileReader extends
AbstractFileReader {
if (Objects.nonNull(client) &&
Objects.nonNull(fileReaderOperator.metadata)) {
return;
}
- client = getKubernetesClient();
- Map<String, String> k8sInfo =
MetaDataUtils.getLogInfo(fileReaderOperator.file.getName());
- ObjectMeta objectMeta = getPodMetadata(k8sInfo.get(NAMESPACE),
k8sInfo.get(POD_NAME));
- fileReaderOperator.metadata = Objects.nonNull(objectMeta) ?
objectMeta.toString() : null;
+ try {
+ client = getKubernetesClient();
+ } catch (IOException e) {
+ log.error("Get k8s client error: ", e);
+ }
+ fileReaderOperator.metadata =
getK8sMetadata(fileReaderOperator.jobConf);
}
- private KubernetesClient getKubernetesClient() {
+ // TODO only support default config in the POD
+ private KubernetesClient getKubernetesClient() throws IOException {
String ip = System.getProperty(KUBERNETES_SERVICE_HOST);
String port = System.getProperty(KUBERNETES_SERVICE_PORT);
if (Objects.isNull(ip) || Objects.isNull(port)) {
@@ -74,8 +89,13 @@ public final class KubernetesFileReader extends
AbstractFileReader {
return null;
}
String maserUrl =
HTTPS.concat(ip).concat(CommonConstants.AGENT_COLON).concat(port);
- Config cofig = new ConfigBuilder().withMasterUrl(maserUrl).build();
- return new KubernetesClientBuilder().withConfig(cofig).build();
+ Config config = new ConfigBuilder()
+ .withMasterUrl(maserUrl)
+ .withCaCertFile(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)
+ .withOauthToken(new String(
+ Files.readAllBytes((new
File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)).toPath())))
+ .build();
+ return new KubernetesClientBuilder().withConfig(config).build();
}
/**
@@ -92,11 +112,35 @@ public final class KubernetesFileReader extends
AbstractFileReader {
/**
* get pod metadata by namespace and pod name
*/
- public ObjectMeta getPodMetadata(String namespace, String podName) {
- List<ObjectMeta> objectMetas =
client.pods().list().getItems().stream().map(Pod::getMetadata)
- .filter(data -> data.getName().equalsIgnoreCase(podName) &&
data.getNamespace()
-
.equalsIgnoreCase(namespace)).collect(Collectors.toList());
- return CollectionUtils.isNotEmpty(objectMetas) ? objectMetas.get(0) :
null;
+ public Map<String, String> getK8sMetadata(JobProfile jobConf) {
+ if (Objects.isNull(jobConf)) {
+ return null;
+ }
+ Map<String, String> k8sInfo =
MetaDataUtils.getLogInfo(fileReaderOperator.file.getName());
+ if (k8sInfo.isEmpty()) {
+ return null;
+ }
+ List<String> namespaces = MetaDataUtils.getNamespace(jobConf);
+ if (Objects.isNull(namespaces) || namespaces.isEmpty()) {
+ return null;
+ }
+ if (!namespaces.contains(k8sInfo.get(NAMESPACE))) {
+ return null;
+ }
+ Pod pod =
client.pods().inNamespace(k8sInfo.get(NAMESPACE)).withName(k8sInfo.get(POD_NAME)).get();
+ PodList podList = client.pods().inNamespace(k8sInfo.get(NAMESPACE))
+ .withLabels(MetaDataUtils.getPodLabels(jobConf)).list();
+ Map<String, String> metadata = new HashMap<>();
+ podList.getItems().forEach(data -> {
+ if (data.equals(pod)) {
+ metadata.put(METADATA_NAMESPACE, k8sInfo.get(NAMESPACE));
+ metadata.put(METADATA_CONTAINER_NAME,
k8sInfo.get(CONTAINER_NAME));
+ metadata.put(METADATA_CONTAINER_ID, k8sInfo.get(CONTAINER_ID));
+ metadata.put(METADATA_POD_NAME, k8sInfo.get(POD_NAME));
+ metadata.put(METADATA_POD_UID, pod.getMetadata().getUid());
+ metadata.put(METADATA_POD_LABEL,
GSON.toJson(pod.getMetadata().getLabels()));
+ }
+ });
+ return metadata;
}
-
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java
similarity index 52%
copy from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
copy to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java
index d066ca2e7..4f892fa81 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java
@@ -15,30 +15,36 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.sources.reader.file;
+package org.apache.inlong.agent.plugin.utils;
-import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang3.StringUtils;
-import java.util.List;
-import java.util.stream.Collectors;
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
/**
- * File reader template
+ * File job utils
*/
-public abstract class AbstractFileReader {
+public class FileDataUtils {
- public FileReaderOperator fileReaderOperator;
+ public static final String KUBERNETES_LOG = "log";
- public abstract void getData() throws Exception;
+ private static final Gson GSON = new Gson();
- public void mergeData(FileReaderOperator fileReaderOperator) {
- if (null == fileReaderOperator.metadata) {
- return;
+ /**
+ * Get standard log for k8s
+ */
+ public static String getK8sJsonLog(String log) {
+ if (!StringUtils.isNoneBlank(log)) {
+ return null;
}
-
- List<String> lines =
fileReaderOperator.stream.collect(Collectors.toList());
- lines.forEach(data -> data = MetaDataUtils.concatString(data,
fileReaderOperator.metadata));
- fileReaderOperator.stream = lines.stream();
+ Type type = new TypeToken<HashMap<Integer, String>>() {
+ }.getType();
+ Map<String, String> logJson = GSON.fromJson(log, type);
+ return logJson.get(KUBERNETES_LOG);
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
index d493ef726..7c3201f58 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
@@ -17,12 +17,21 @@
package org.apache.inlong.agent.plugin.utils;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.CommonConstants;
+import java.lang.reflect.Type;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import static
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_FILTER_BY_LABELS;
+import static
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_PROPERTIES;
import static
org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_ID;
import static
org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_NAME;
import static org.apache.inlong.agent.constant.KubernetesConstants.NAMESPACE;
@@ -33,9 +42,11 @@ import static
org.apache.inlong.agent.constant.KubernetesConstants.POD_NAME;
*/
public class MetaDataUtils {
+ private static final Gson GSON = new Gson();
+
/**
* standard log for k8s
- *
+ *
* get pod_name,namespace,container_name,container_id
*/
public static Map<String, String> getLogInfo(String fileName) {
@@ -53,10 +64,56 @@ public class MetaDataUtils {
return podInf;
}
- public static String concatString(String str1, String str2) {
- if (!StringUtils.isNoneBlank(str2)) {
- return str1;
+ /**
+ * standard log for k8s
+ *
+ * get labels of pod
+ */
+ public static Map<String, String> getPodLabels(JobProfile jobProfile) {
+ if (Objects.isNull(jobProfile) ||
!jobProfile.hasKey(JOB_FILE_META_FILTER_BY_LABELS)) {
+ return null;
+ }
+ String labels = jobProfile.get(JOB_FILE_META_FILTER_BY_LABELS);
+ Type type = new TypeToken<HashMap<Integer, String>>() {
+ }.getType();
+ return GSON.fromJson(labels, type);
+ }
+
+ public static List<String> getNamespace(JobProfile jobProfile) {
+ if (Objects.isNull(jobProfile) ||
!jobProfile.hasKey(JOB_FILE_PROPERTIES)) {
+ return null;
+ }
+ String property = jobProfile.get(JOB_FILE_PROPERTIES);
+ Type type = new TypeToken<HashMap<Integer, String>>() {
+ }.getType();
+ Map<String, String> properties = GSON.fromJson(property, type);
+ return properties.keySet().stream().map(data -> {
+ if (data.contains(NAMESPACE)) {
+ return properties.get(data);
+ }
+ return null;
+ }).filter(Objects::nonNull).collect(Collectors.toList());
+ }
+
+ /**
+ * standard log for k8s
+ *
+ * get name of pod
+ */
+ public static String getPodName(JobProfile jobProfile) {
+ if (Objects.isNull(jobProfile) ||
!jobProfile.hasKey(JOB_FILE_PROPERTIES)) {
+ return null;
}
- return str1.concat("\n").concat(str2);
+ String property = jobProfile.get(JOB_FILE_PROPERTIES);
+ Type type = new TypeToken<HashMap<Integer, String>>() {
+ }.getType();
+ Map<String, String> properties = GSON.fromJson(property, type);
+ List<String> podName = properties.keySet().stream().map(data -> {
+ if (data.contains(POD_NAME)) {
+ return properties.get(data);
+ }
+ return null;
+ }).filter(Objects::nonNull).collect(Collectors.toList());
+ return podName.isEmpty() ? null : podName.get(0);
}
}