This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ceab44de [INLONG-5533][Agent] Support structured output in the 
Kubernetes (#5534)
1ceab44de is described below

commit 1ceab44dea6257dec054999380d17d075fe8630f
Author: ganfengtan <[email protected]>
AuthorDate: Sat Aug 13 14:25:27 2022 +0800

    [INLONG-5533][Agent] Support structured output in the Kubernetes (#5534)
---
 .../apache/inlong/agent/constant/JobConstants.java |  3 +-
 .../inlong/agent/constant/KubernetesConstants.java | 12 +++-
 .../sources/reader/file/AbstractFileReader.java    | 32 ++++++++-
 .../sources/reader/file/FileReaderOperator.java    |  3 +-
 .../sources/reader/file/KubernetesFileReader.java  | 76 +++++++++++++++++-----
 .../FileDataUtils.java}                            | 36 +++++-----
 .../inlong/agent/plugin/utils/MetaDataUtils.java   | 67 +++++++++++++++++--
 7 files changed, 187 insertions(+), 42 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index 04db5b097..d913c0257 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -54,8 +54,9 @@ public class JobConstants extends CommonConstants {
     public static final String JOB_FILE_LINE_END_PATTERN = 
"job.fileJob.line.endPattern";
     public static final String JOB_FILE_CONTENT_COLLECT_TYPE = 
"job.fileJob.contentCollectType";
     public static final String JOB_FILE_META_ENV_LIST = "job.fileJob.envList";
-    public static final String JOB_FILE_DATA_SOURCE_COLUMN_SEPARATOR = 
"job.fileJob.dataSeparator";
     public static final String JOB_FILE_META_FILTER_BY_LABELS = 
"job.fileJob.filterMetaByLabels";
+    public static final String JOB_FILE_PROPERTIES = "job.fileJob.properties";
+    public static final String JOB_FILE_DATA_SOURCE_COLUMN_SEPARATOR = 
"job.fileJob.dataSeparator";
 
     //Binlog job
     public static final String JOB_DATABASE_USER = "job.binlogJob.user";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
index dc76a39a9..e71ae4bae 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.agent.constant;
 
 /**
- *  k8s information
+ * k8s information
  */
 public class KubernetesConstants {
 
@@ -32,5 +32,15 @@ public class KubernetesConstants {
     public static final String CONTAINER_NAME = "container.name";
     public static final String CONTAINER_ID = "container.id";
 
+    // k8s metadata
+    public static final String METADATA_CONTAINER_ID = "_container_id_";
+    public static final String METADATA_CONTAINER_NAME = "_container_name_";
+    public static final String METADATA_NAMESPACE = "_namespace_";
+    public static final String METADATA_POD_UID = "_pod_uid_";
+    public static final String METADATA_POD_NAME = "_pod_name_";
+    public static final String METADATA_POD_LABEL = "_pod_label_";
+    public static final String DATA_CONTENT = "_content_";
+    public static final String DATA_CONTENT_TIME = "_time_";
+
 
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
index d066ca2e7..f61cb4c28 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
@@ -17,16 +17,26 @@
 
 package org.apache.inlong.agent.plugin.sources.reader.file;
 
-import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
+import com.google.gson.Gson;
+import org.apache.inlong.agent.plugin.utils.FileDataUtils;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
+import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE;
+import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_FILTER_BY_LABELS;
+import static 
org.apache.inlong.agent.constant.KubernetesConstants.DATA_CONTENT;
+import static 
org.apache.inlong.agent.constant.KubernetesConstants.DATA_CONTENT_TIME;
+
 /**
  * File reader template
  */
 public abstract class AbstractFileReader {
 
+    private static final Gson GSON = new Gson();
     public FileReaderOperator fileReaderOperator;
 
     public abstract void getData() throws Exception;
@@ -35,9 +45,25 @@ public abstract class AbstractFileReader {
         if (null == fileReaderOperator.metadata) {
             return;
         }
-
         List<String> lines = 
fileReaderOperator.stream.collect(Collectors.toList());
-        lines.forEach(data -> data = MetaDataUtils.concatString(data, 
fileReaderOperator.metadata));
+        if (fileReaderOperator.jobConf.hasKey(JOB_FILE_CONTENT_COLLECT_TYPE)) {
+            long timestamp = System.currentTimeMillis();
+            if (Objects.nonNull(fileReaderOperator.metadata)) {
+                lines = lines.stream().map(data -> {
+                    Map<String, String> mergeData = new 
HashMap<>(fileReaderOperator.metadata);
+                    mergeData.put(DATA_CONTENT, 
FileDataUtils.getK8sJsonLog(data));
+                    mergeData.put(DATA_CONTENT_TIME, 
String.valueOf(timestamp));
+                    return GSON.toJson(mergeData);
+                }).collect(Collectors.toList());
+            } else if 
(!fileReaderOperator.jobConf.hasKey(JOB_FILE_META_FILTER_BY_LABELS)) {
+                lines = lines.stream().map(data -> {
+                    Map<String, String> mergeData = new HashMap<>();
+                    mergeData.put(DATA_CONTENT, 
FileDataUtils.getK8sJsonLog(data));
+                    mergeData.put(DATA_CONTENT_TIME, 
String.valueOf(timestamp));
+                    return GSON.toJson(mergeData);
+                }).collect(Collectors.toList());
+            }
+        }
         fileReaderOperator.stream = lines.stream();
     }
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 5f448e3c4..f3d7d91bc 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -36,6 +36,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
@@ -58,7 +59,7 @@ public class FileReaderOperator extends AbstractReader {
     public int position;
     public String md5;
     public Stream<String> stream;
-    public String metadata;
+    public Map<String, String> metadata;
     public JobProfile jobConf;
     private Iterator<String> iterator;
     private long timeout;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
index 9f0053351..04331278d 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.agent.plugin.sources.reader.file;
 
-import io.fabric8.kubernetes.api.model.ObjectMeta;
+import com.google.gson.Gson;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodList;
 import io.fabric8.kubernetes.client.Config;
@@ -26,20 +26,31 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClientBuilder;
 import io.fabric8.kubernetes.client.dsl.MixedOperation;
 import io.fabric8.kubernetes.client.dsl.PodResource;
-import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
+import static 
org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_ID;
+import static 
org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_NAME;
 import static org.apache.inlong.agent.constant.KubernetesConstants.HTTPS;
 import static 
org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_HOST;
 import static 
org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_PORT;
+import static 
org.apache.inlong.agent.constant.KubernetesConstants.METADATA_CONTAINER_ID;
+import static 
org.apache.inlong.agent.constant.KubernetesConstants.METADATA_CONTAINER_NAME;
+import static 
org.apache.inlong.agent.constant.KubernetesConstants.METADATA_NAMESPACE;
+import static 
org.apache.inlong.agent.constant.KubernetesConstants.METADATA_POD_LABEL;
+import static 
org.apache.inlong.agent.constant.KubernetesConstants.METADATA_POD_NAME;
+import static 
org.apache.inlong.agent.constant.KubernetesConstants.METADATA_POD_UID;
 import static org.apache.inlong.agent.constant.KubernetesConstants.NAMESPACE;
 import static org.apache.inlong.agent.constant.KubernetesConstants.POD_NAME;
 
@@ -49,6 +60,7 @@ import static 
org.apache.inlong.agent.constant.KubernetesConstants.POD_NAME;
 public final class KubernetesFileReader extends AbstractFileReader {
 
     private static final Logger log = 
LoggerFactory.getLogger(KubernetesFileReader.class);
+    private static final Gson GSON = new Gson();
 
     private KubernetesClient client;
 
@@ -60,13 +72,16 @@ public final class KubernetesFileReader extends 
AbstractFileReader {
         if (Objects.nonNull(client) && 
Objects.nonNull(fileReaderOperator.metadata)) {
             return;
         }
-        client = getKubernetesClient();
-        Map<String, String> k8sInfo = 
MetaDataUtils.getLogInfo(fileReaderOperator.file.getName());
-        ObjectMeta objectMeta = getPodMetadata(k8sInfo.get(NAMESPACE), 
k8sInfo.get(POD_NAME));
-        fileReaderOperator.metadata = Objects.nonNull(objectMeta) ? 
objectMeta.toString() : null;
+        try {
+            client = getKubernetesClient();
+        } catch (IOException e) {
+            log.error("Get k8s client error: ", e);
+        }
+        fileReaderOperator.metadata = 
getK8sMetadata(fileReaderOperator.jobConf);
     }
 
-    private KubernetesClient getKubernetesClient() {
+    // TODO only support default config in the POD
+    private KubernetesClient getKubernetesClient() throws IOException {
         String ip = System.getProperty(KUBERNETES_SERVICE_HOST);
         String port = System.getProperty(KUBERNETES_SERVICE_PORT);
         if (Objects.isNull(ip) || Objects.isNull(port)) {
@@ -74,8 +89,13 @@ public final class KubernetesFileReader extends 
AbstractFileReader {
             return null;
         }
         String maserUrl = 
HTTPS.concat(ip).concat(CommonConstants.AGENT_COLON).concat(port);
-        Config cofig = new ConfigBuilder().withMasterUrl(maserUrl).build();
-        return new KubernetesClientBuilder().withConfig(cofig).build();
+        Config config = new ConfigBuilder()
+                .withMasterUrl(maserUrl)
+                .withCaCertFile(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)
+                .withOauthToken(new String(
+                        Files.readAllBytes((new 
File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)).toPath())))
+                .build();
+        return new KubernetesClientBuilder().withConfig(config).build();
     }
 
     /**
@@ -92,11 +112,35 @@ public final class KubernetesFileReader extends 
AbstractFileReader {
     /**
      * get pod metadata by namespace and pod name
      */
-    public ObjectMeta getPodMetadata(String namespace, String podName) {
-        List<ObjectMeta> objectMetas = 
client.pods().list().getItems().stream().map(Pod::getMetadata)
-                .filter(data -> data.getName().equalsIgnoreCase(podName) && 
data.getNamespace()
-                        
.equalsIgnoreCase(namespace)).collect(Collectors.toList());
-        return CollectionUtils.isNotEmpty(objectMetas) ? objectMetas.get(0) : 
null;
+    public Map<String, String> getK8sMetadata(JobProfile jobConf) {
+        if (Objects.isNull(jobConf)) {
+            return null;
+        }
+        Map<String, String> k8sInfo = 
MetaDataUtils.getLogInfo(fileReaderOperator.file.getName());
+        if (k8sInfo.isEmpty()) {
+            return null;
+        }
+        List<String> namespaces = MetaDataUtils.getNamespace(jobConf);
+        if (Objects.isNull(namespaces) || namespaces.isEmpty()) {
+            return null;
+        }
+        if (!namespaces.contains(k8sInfo.get(NAMESPACE))) {
+            return null;
+        }
+        Pod pod = 
client.pods().inNamespace(k8sInfo.get(NAMESPACE)).withName(k8sInfo.get(POD_NAME)).get();
+        PodList podList = client.pods().inNamespace(k8sInfo.get(NAMESPACE))
+                .withLabels(MetaDataUtils.getPodLabels(jobConf)).list();
+        Map<String, String> metadata = new HashMap<>();
+        podList.getItems().forEach(data -> {
+            if (data.equals(pod)) {
+                metadata.put(METADATA_NAMESPACE, k8sInfo.get(NAMESPACE));
+                metadata.put(METADATA_CONTAINER_NAME, 
k8sInfo.get(CONTAINER_NAME));
+                metadata.put(METADATA_CONTAINER_ID, k8sInfo.get(CONTAINER_ID));
+                metadata.put(METADATA_POD_NAME, k8sInfo.get(POD_NAME));
+                metadata.put(METADATA_POD_UID, pod.getMetadata().getUid());
+                metadata.put(METADATA_POD_LABEL, 
GSON.toJson(pod.getMetadata().getLabels()));
+            }
+        });
+        return metadata;
     }
-
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java
similarity index 52%
copy from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
copy to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java
index d066ca2e7..4f892fa81 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java
@@ -15,30 +15,36 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.sources.reader.file;
+package org.apache.inlong.agent.plugin.utils;
 
-import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang3.StringUtils;
 
-import java.util.List;
-import java.util.stream.Collectors;
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
- * File reader template
+ * File job utils
  */
-public abstract class AbstractFileReader {
+public class FileDataUtils {
 
-    public FileReaderOperator fileReaderOperator;
+    public static final String KUBERNETES_LOG = "log";
 
-    public abstract void getData() throws Exception;
+    private static final Gson GSON = new Gson();
 
-    public void mergeData(FileReaderOperator fileReaderOperator) {
-        if (null == fileReaderOperator.metadata) {
-            return;
+    /**
+     * Get standard log for k8s
+     */
+    public static String getK8sJsonLog(String log) {
+        if (!StringUtils.isNoneBlank(log)) {
+            return null;
         }
-
-        List<String> lines = 
fileReaderOperator.stream.collect(Collectors.toList());
-        lines.forEach(data -> data = MetaDataUtils.concatString(data, 
fileReaderOperator.metadata));
-        fileReaderOperator.stream = lines.stream();
+        Type type = new TypeToken<HashMap<Integer, String>>() {
+        }.getType();
+        Map<String, String> logJson = GSON.fromJson(log, type);
+        return logJson.get(KUBERNETES_LOG);
     }
 
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
index d493ef726..7c3201f58 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
@@ -17,12 +17,21 @@
 
 package org.apache.inlong.agent.plugin.utils;
 
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
 
+import java.lang.reflect.Type;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
+import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_FILTER_BY_LABELS;
+import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_PROPERTIES;
 import static 
org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_ID;
 import static 
org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_NAME;
 import static org.apache.inlong.agent.constant.KubernetesConstants.NAMESPACE;
@@ -33,9 +42,11 @@ import static 
org.apache.inlong.agent.constant.KubernetesConstants.POD_NAME;
  */
 public class MetaDataUtils {
 
+    private static final Gson GSON = new Gson();
+
     /**
      * standard log for k8s
-     * 
+     *
      * get pod_name,namespace,container_name,container_id
      */
     public static Map<String, String> getLogInfo(String fileName) {
@@ -53,10 +64,56 @@ public class MetaDataUtils {
         return podInf;
     }
 
-    public static String concatString(String str1, String str2) {
-        if (!StringUtils.isNoneBlank(str2)) {
-            return str1;
+    /**
+     * standard log for k8s
+     *
+     * get labels of pod
+     */
+    public static Map<String, String> getPodLabels(JobProfile jobProfile) {
+        if (Objects.isNull(jobProfile) || 
!jobProfile.hasKey(JOB_FILE_META_FILTER_BY_LABELS)) {
+            return null;
+        }
+        String labels = jobProfile.get(JOB_FILE_META_FILTER_BY_LABELS);
+        Type type = new TypeToken<HashMap<Integer, String>>() {
+        }.getType();
+        return GSON.fromJson(labels, type);
+    }
+
+    public static List<String> getNamespace(JobProfile jobProfile) {
+        if (Objects.isNull(jobProfile) || 
!jobProfile.hasKey(JOB_FILE_PROPERTIES)) {
+            return null;
+        }
+        String property = jobProfile.get(JOB_FILE_PROPERTIES);
+        Type type = new TypeToken<HashMap<Integer, String>>() {
+        }.getType();
+        Map<String, String> properties = GSON.fromJson(property, type);
+        return properties.keySet().stream().map(data -> {
+            if (data.contains(NAMESPACE)) {
+                return properties.get(data);
+            }
+            return null;
+        }).filter(Objects::nonNull).collect(Collectors.toList());
+    }
+
+    /**
+     * standard log for k8s
+     *
+     * get name of pod
+     */
+    public static String getPodName(JobProfile jobProfile) {
+        if (Objects.isNull(jobProfile) || 
!jobProfile.hasKey(JOB_FILE_PROPERTIES)) {
+            return null;
         }
-        return str1.concat("\n").concat(str2);
+        String property = jobProfile.get(JOB_FILE_PROPERTIES);
+        Type type = new TypeToken<HashMap<Integer, String>>() {
+        }.getType();
+        Map<String, String> properties = GSON.fromJson(property, type);
+        List<String> podName = properties.keySet().stream().map(data -> {
+            if (data.contains(POD_NAME)) {
+                return properties.get(data);
+            }
+            return null;
+        }).filter(Objects::nonNull).collect(Collectors.toList());
+        return podName.isEmpty() ? null : podName.get(0);
     }
 }

Reply via email to