EMsnap commented on code in PR #9113:
URL: https://github.com/apache/inlong/pull/9113#discussion_r1371619552


##########
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
+import org.apache.inlong.agent.pojo.FileTask.Line;
+import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.common.enums.TaskTypeEnum;
+import org.apache.inlong.common.pojo.agent.DataConfig;
+
+import com.google.gson.Gson;
+import lombok.Data;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
+import static org.apache.inlong.agent.constant.TaskConstants.SYNC_SEND_OPEN;
+
+@Data
+public class TaskProfileDto {
+
+    public static final String DEFAULT_FILE_TASK = 
"org.apache.inlong.agent.plugin.task.filecollect.LogFileCollectTask";
+    public static final String DEFAULT_CHANNEL = 
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
+    public static final String MANAGER_JOB = "MANAGER_JOB";
+    public static final String DEFAULT_DATAPROXY_SINK = 
"org.apache.inlong.agent.plugin.sinks.ProxySink";
+    public static final String FILE_DATAPROXY_SINK =
+            "org.apache.inlong.agent.plugin.sinks.filecollect.ProxySink";
+    public static final String PULSAR_SINK = 
"org.apache.inlong.agent.plugin.sinks.PulsarSink";
+    public static final String KAFKA_SINK = 
"org.apache.inlong.agent.plugin.sinks.KafkaSink";
+
+    /**
+     * file source
+     */
+    public static final String DEFAULT_SOURCE = 
"org.apache.inlong.agent.plugin.sources.LogFileSource";
+    /**
+     * binlog source
+     */
+    public static final String BINLOG_SOURCE = 
"org.apache.inlong.agent.plugin.sources.BinlogSource";
+    /**
+     * kafka source
+     */
+    public static final String KAFKA_SOURCE = 
"org.apache.inlong.agent.plugin.sources.KafkaSource";
+    /**
+     * PostgreSQL source
+     */
+    public static final String POSTGRESQL_SOURCE = 
"org.apache.inlong.agent.plugin.sources.PostgreSQLSource";
+    /**
+     * mongo source
+     */
+    public static final String MONGO_SOURCE = 
"org.apache.inlong.agent.plugin.sources.MongoDBSource";
+    /**
+     * oracle source
+     */
+    public static final String ORACLE_SOURCE = 
"org.apache.inlong.agent.plugin.sources.OracleSource";
+    /**
+     * redis source
+     */
+    public static final String REDIS_SOURCE = 
"org.apache.inlong.agent.plugin.sources.RedisSource";
+    /**
+     * mqtt source
+     */
+    public static final String MQTT_SOURCE = 
"org.apache.inlong.agent.plugin.sources.MqttSource";
+    /**
+     * sqlserver source
+     */
+    public static final String SQLSERVER_SOURCE = 
"org.apache.inlong.agent.plugin.sources.SQLServerSource";
+
+    private static final Gson GSON = new Gson();
+
+    private Task task;
+    private Proxy proxy;
+
+    private static BinlogJob getBinlogJob(DataConfig dataConfigs) {
+        BinlogJob.BinlogJobTaskConfig binlogJobTaskConfig = 
GSON.fromJson(dataConfigs.getExtParams(),
+                BinlogJob.BinlogJobTaskConfig.class);
+
+        BinlogJob binlogJob = new BinlogJob();
+        binlogJob.setHostname(binlogJobTaskConfig.getHostname());
+        binlogJob.setPassword(binlogJobTaskConfig.getPassword());
+        binlogJob.setUser(binlogJobTaskConfig.getUser());
+        binlogJob.setTableWhiteList(binlogJobTaskConfig.getTableWhiteList());
+        
binlogJob.setDatabaseWhiteList(binlogJobTaskConfig.getDatabaseWhiteList());
+        binlogJob.setSchema(binlogJobTaskConfig.getIncludeSchema());
+        binlogJob.setPort(binlogJobTaskConfig.getPort());
+        binlogJob.setOffsets(dataConfigs.getSnapshot());
+        binlogJob.setDdl(binlogJobTaskConfig.getMonitoredDdl());
+        binlogJob.setServerTimezone(binlogJobTaskConfig.getServerTimezone());
+
+        BinlogJob.Offset offset = new BinlogJob.Offset();
+        offset.setIntervalMs(binlogJobTaskConfig.getIntervalMs());
+        offset.setFilename(binlogJobTaskConfig.getOffsetFilename());
+        
offset.setSpecificOffsetFile(binlogJobTaskConfig.getSpecificOffsetFile());
+        
offset.setSpecificOffsetPos(binlogJobTaskConfig.getSpecificOffsetPos());
+
+        binlogJob.setOffset(offset);
+
+        BinlogJob.Snapshot snapshot = new BinlogJob.Snapshot();
+        snapshot.setMode(binlogJobTaskConfig.getSnapshotMode());
+
+        binlogJob.setSnapshot(snapshot);
+
+        BinlogJob.History history = new BinlogJob.History();
+        history.setFilename(binlogJobTaskConfig.getHistoryFilename());
+
+        binlogJob.setHistory(history);
+
+        return binlogJob;
+    }
+
+    private static FileTask getFileJob(DataConfig dataConfig) {
+        FileTask fileTask = new FileTask();
+        fileTask.setId(dataConfig.getTaskId());
+
+        FileTaskConfig taskConfig = GSON.fromJson(dataConfig.getExtParams(),
+                FileTaskConfig.class);
+
+        FileTask.Dir dir = new FileTask.Dir();
+        dir.setPatterns(taskConfig.getPattern());
+        dir.setBlackList(taskConfig.getBlackList());
+        fileTask.setDir(dir);
+        fileTask.setCollectType(taskConfig.getCollectType());
+        fileTask.setContentCollectType(taskConfig.getContentCollectType());
+        fileTask.setDataSeparator(taskConfig.getDataSeparator());
+        fileTask.setMaxFileCount(taskConfig.getMaxFileCount());
+        fileTask.setRetry(taskConfig.getRetry());
+        fileTask.setCycleUnit(taskConfig.getCycleUnit());
+        fileTask.setStartTime(taskConfig.getStartTime());
+        fileTask.setEndTime(taskConfig.getEndTime());
+        fileTask.setProperties(GSON.toJson(taskConfig.getProperties()));
+        if (taskConfig.getTimeOffset() != null) {
+            fileTask.setTimeOffset(taskConfig.getTimeOffset());
+        }
+
+        if (taskConfig.getAdditionalAttr() != null) {
+            fileTask.setAddictiveString(taskConfig.getAdditionalAttr());
+        }
+
+        if (null != taskConfig.getLineEndPattern()) {
+            FileTask.Line line = new Line();
+            line.setEndPattern(taskConfig.getLineEndPattern());
+            fileTask.setLine(line);
+        }
+
+        if (null != taskConfig.getEnvList()) {
+            fileTask.setEnvList(taskConfig.getEnvList());
+        }
+
+        if (null != taskConfig.getMetaFields()) {
+            fileTask.setMetaFields(GSON.toJson(taskConfig.getMetaFields()));
+        }
+
+        if (null != taskConfig.getFilterMetaByLabels()) {
+            
fileTask.setFilterMetaByLabels(GSON.toJson(taskConfig.getFilterMetaByLabels()));
+        }
+
+        if (null != taskConfig.getMonitorInterval()) {
+            fileTask.setMonitorInterval(taskConfig.getMonitorInterval());
+        }
+
+        if (null != taskConfig.getMonitorStatus()) {
+            fileTask.setMonitorStatus(taskConfig.getMonitorStatus());
+        }
+        return fileTask;
+    }
+
+    private static KafkaJob getKafkaJob(DataConfig dataConfigs) {
+
+        KafkaJob.KafkaJobTaskConfig kafkaJobTaskConfig = 
GSON.fromJson(dataConfigs.getExtParams(),
+                KafkaJob.KafkaJobTaskConfig.class);
+        KafkaJob kafkaJob = new KafkaJob();
+
+        KafkaJob.Bootstrap bootstrap = new KafkaJob.Bootstrap();
+        bootstrap.setServers(kafkaJobTaskConfig.getBootstrapServers());
+        kafkaJob.setBootstrap(bootstrap);
+        KafkaJob.Partition partition = new KafkaJob.Partition();
+        partition.setOffset(dataConfigs.getSnapshot());
+        kafkaJob.setPartition(partition);
+        KafkaJob.Group group = new KafkaJob.Group();
+        group.setId(kafkaJobTaskConfig.getGroupId());
+        kafkaJob.setGroup(group);
+        KafkaJob.RecordSpeed recordSpeed = new KafkaJob.RecordSpeed();
+        recordSpeed.setLimit(kafkaJobTaskConfig.getRecordSpeedLimit());
+        kafkaJob.setRecordSpeed(recordSpeed);
+        KafkaJob.ByteSpeed byteSpeed = new KafkaJob.ByteSpeed();
+        byteSpeed.setLimit(kafkaJobTaskConfig.getByteSpeedLimit());
+        kafkaJob.setByteSpeed(byteSpeed);
+        kafkaJob.setAutoOffsetReset(kafkaJobTaskConfig.getAutoOffsetReset());
+
+        kafkaJob.setTopic(kafkaJobTaskConfig.getTopic());
+
+        return kafkaJob;
+    }
+
+    private static PostgreSQLJob getPostgresJob(DataConfig dataConfigs) {
+        PostgreSQLJob.PostgreSQLJobConfig config = 
GSON.fromJson(dataConfigs.getExtParams(),
+                PostgreSQLJob.PostgreSQLJobConfig.class);
+        PostgreSQLJob postgreSQLJob = new PostgreSQLJob();
+
+        postgreSQLJob.setUser(config.getUsername());
+        postgreSQLJob.setPassword(config.getPassword());
+        postgreSQLJob.setHostname(config.getHostname());
+        postgreSQLJob.setPort(config.getPort());
+        postgreSQLJob.setDbname(config.getDatabase());
+        postgreSQLJob.setServername(config.getSchema());
+        postgreSQLJob.setPluginname(config.getDecodingPluginName());
+        postgreSQLJob.setTableNameList(config.getTableNameList());
+        postgreSQLJob.setServerTimeZone(config.getServerTimeZone());
+        postgreSQLJob.setScanStartupMode(config.getScanStartupMode());
+        postgreSQLJob.setPrimaryKey(config.getPrimaryKey());
+
+        return postgreSQLJob;
+    }
+
+    private static RedisJob getRedisJob(DataConfig dataConfig) {
+        RedisJob.RedisJobConfig config = 
GSON.fromJson(dataConfig.getExtParams(), RedisJob.RedisJobConfig.class);
+        RedisJob redisJob = new RedisJob();
+
+        redisJob.setAuthUser(config.getUsername());
+        redisJob.setAuthPassword(config.getPassword());
+        redisJob.setHostname(config.getHostname());
+        redisJob.setPort(config.getPort());
+        redisJob.setSsl(config.getSsl());
+        redisJob.setReadTimeout(config.getTimeout());
+        redisJob.setQueueSize(config.getQueueSize());
+        redisJob.setReplId(config.getReplId());
+
+        return redisJob;
+    }
+
+    private static MongoJob getMongoJob(DataConfig dataConfigs) {
+
+        MongoJob.MongoJobTaskConfig config = 
GSON.fromJson(dataConfigs.getExtParams(),
+                MongoJob.MongoJobTaskConfig.class);
+        MongoJob mongoJob = new MongoJob();
+
+        mongoJob.setHosts(config.getHosts());
+        mongoJob.setUser(config.getUsername());
+        mongoJob.setPassword(config.getPassword());
+        mongoJob.setDatabaseIncludeList(config.getDatabaseIncludeList());
+        mongoJob.setDatabaseExcludeList(config.getDatabaseExcludeList());
+        mongoJob.setCollectionIncludeList(config.getCollectionIncludeList());
+        mongoJob.setCollectionExcludeList(config.getCollectionExcludeList());
+        mongoJob.setFieldExcludeList(config.getFieldExcludeList());
+        mongoJob.setConnectTimeoutInMs(config.getConnectTimeoutInMs());
+        mongoJob.setQueueSize(config.getQueueSize());
+        mongoJob.setCursorMaxAwaitTimeInMs(config.getCursorMaxAwaitTimeInMs());
+        mongoJob.setSocketTimeoutInMs(config.getSocketTimeoutInMs());
+        mongoJob.setSelectionTimeoutInMs(config.getSelectionTimeoutInMs());
+        mongoJob.setFieldRenames(config.getFieldRenames());
+        mongoJob.setMembersAutoDiscover(config.getMembersAutoDiscover());
+        mongoJob.setConnectMaxAttempts(config.getConnectMaxAttempts());
+        
mongoJob.setConnectBackoffMaxDelayInMs(config.getConnectBackoffMaxDelayInMs());
+        
mongoJob.setConnectBackoffInitialDelayInMs(config.getConnectBackoffInitialDelayInMs());
+        mongoJob.setInitialSyncMaxThreads(config.getInitialSyncMaxThreads());
+        
mongoJob.setSslInvalidHostnameAllowed(config.getSslInvalidHostnameAllowed());
+        mongoJob.setSslEnabled(config.getSslEnabled());
+        mongoJob.setPollIntervalInMs(config.getPollIntervalInMs());
+
+        MongoJob.Offset offset = new MongoJob.Offset();
+        offset.setFilename(config.getOffsetFilename());
+        offset.setSpecificOffsetFile(config.getSpecificOffsetFile());
+        offset.setSpecificOffsetPos(config.getSpecificOffsetPos());
+        mongoJob.setOffset(offset);
+
+        MongoJob.Snapshot snapshot = new MongoJob.Snapshot();
+        snapshot.setMode(config.getSnapshotMode());
+        mongoJob.setSnapshot(snapshot);
+
+        MongoJob.History history = new MongoJob.History();
+        history.setFilename(config.getHistoryFilename());
+        mongoJob.setHistory(history);
+
+        return mongoJob;
+    }
+
+    private static OracleJob getOracleJob(DataConfig dataConfigs) {
+        OracleJob.OracleJobConfig config = 
GSON.fromJson(dataConfigs.getExtParams(),
+                OracleJob.OracleJobConfig.class);
+        OracleJob oracleJob = new OracleJob();
+        oracleJob.setUser(config.getUser());
+        oracleJob.setHostname(config.getHostname());
+        oracleJob.setPassword(config.getPassword());
+        oracleJob.setPort(config.getPort());
+        oracleJob.setServerName(config.getServerName());
+        oracleJob.setDbname(config.getDbname());
+
+        OracleJob.Offset offset = new OracleJob.Offset();
+        offset.setFilename(config.getOffsetFilename());
+        offset.setSpecificOffsetFile(config.getSpecificOffsetFile());
+        offset.setSpecificOffsetPos(config.getSpecificOffsetPos());
+        oracleJob.setOffset(offset);
+
+        OracleJob.Snapshot snapshot = new OracleJob.Snapshot();
+        snapshot.setMode(config.getSnapshotMode());
+        oracleJob.setSnapshot(snapshot);
+
+        OracleJob.History history = new OracleJob.History();
+        history.setFilename(config.getHistoryFilename());
+        oracleJob.setHistory(history);
+
+        return oracleJob;
+    }
+
+    private static SqlServerJob getSqlServerJob(DataConfig dataConfigs) {
+        SqlServerJob.SqlserverJobConfig config = 
GSON.fromJson(dataConfigs.getExtParams(),
+                SqlServerJob.SqlserverJobConfig.class);
+        SqlServerJob sqlServerJob = new SqlServerJob();
+        sqlServerJob.setUser(config.getUsername());
+        sqlServerJob.setHostname(config.getHostname());
+        sqlServerJob.setPassword(config.getPassword());
+        sqlServerJob.setPort(config.getPort());
+        sqlServerJob.setServerName(config.getSchemaName());
+        sqlServerJob.setDbname(config.getDatabase());
+
+        SqlServerJob.Offset offset = new SqlServerJob.Offset();
+        offset.setFilename(config.getOffsetFilename());
+        offset.setSpecificOffsetFile(config.getSpecificOffsetFile());
+        offset.setSpecificOffsetPos(config.getSpecificOffsetPos());
+        sqlServerJob.setOffset(offset);
+
+        SqlServerJob.Snapshot snapshot = new SqlServerJob.Snapshot();
+        snapshot.setMode(config.getSnapshotMode());
+        sqlServerJob.setSnapshot(snapshot);
+
+        SqlServerJob.History history = new SqlServerJob.History();
+        history.setFilename(config.getHistoryFilename());
+        sqlServerJob.setHistory(history);
+
+        return sqlServerJob;
+    }
+
+    public static MqttJob getMqttJob(DataConfig dataConfigs) {
+        MqttJob.MqttJobConfig config = 
GSON.fromJson(dataConfigs.getExtParams(),
+                MqttJob.MqttJobConfig.class);
+        MqttJob mqttJob = new MqttJob();
+
+        mqttJob.setServerURI(config.getServerURI());
+        mqttJob.setUserName(config.getUsername());
+        mqttJob.setPassword(config.getPassword());
+        mqttJob.setTopic(config.getTopic());
+        mqttJob.setConnectionTimeOut(config.getConnectionTimeOut());
+        mqttJob.setKeepAliveInterval(config.getKeepAliveInterval());
+        mqttJob.setQos(config.getQos());
+        mqttJob.setCleanSession(config.getCleanSession());
+        mqttJob.setClientIdPrefix(config.getClientId());
+        mqttJob.setQueueSize(config.getQueueSize());
+        mqttJob.setAutomaticReconnect(config.getAutomaticReconnect());
+        mqttJob.setMqttVersion(config.getMqttVersion());
+
+        return mqttJob;
+    }
+
+    private static Proxy getProxy(DataConfig dataConfigs) {
+        Proxy proxy = new Proxy();
+        Manager manager = new Manager();
+        AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
+        manager.setHost(agentConf.get(AGENT_MANAGER_VIP_HTTP_HOST));
+        manager.setPort(agentConf.get(AGENT_MANAGER_VIP_HTTP_PORT));
+        proxy.setInlongGroupId(dataConfigs.getInlongGroupId());
+        proxy.setInlongStreamId(dataConfigs.getInlongStreamId());
+        proxy.setManager(manager);
+        if (null != dataConfigs.getSyncSend()) {
+            proxy.setSync(dataConfigs.getSyncSend() == SYNC_SEND_OPEN);
+        }
+        if (null != dataConfigs.getSyncPartitionKey()) {
+            proxy.setPartitionKey(dataConfigs.getSyncPartitionKey());
+        }
+        return proxy;
+    }
+
+    /**
+     * convert DataConfig to TaskProfile
+     */
+    public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
+        if (!dataConfig.isValid()) {
+            throw new IllegalArgumentException("input dataConfig" + dataConfig 
+ "is invalid please check");
+        }
+
+        TaskProfileDto profileDto = new TaskProfileDto();
+        Proxy proxy = getProxy(dataConfig);
+        profileDto.setProxy(proxy);
+        Task task = new Task();
+
+        // common attribute
+        task.setId(String.valueOf(dataConfig.getTaskId()));
+        task.setGroupId(dataConfig.getInlongGroupId());
+        task.setStreamId(dataConfig.getInlongStreamId());
+        task.setChannel(DEFAULT_CHANNEL);
+        task.setIp(dataConfig.getIp());
+        task.setOp(dataConfig.getOp());
+        task.setDeliveryTime(dataConfig.getDeliveryTime());
+        task.setUuid(dataConfig.getUuid());
+        task.setVersion(dataConfig.getVersion());
+        task.setState(dataConfig.getState());
+
+        // set sink type
+        if (dataConfig.getDataReportType() == 0) {

Review Comment:
   0 and 1 can be constant with meaningful name , the best way is making type a 
enum 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to