This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 206a7adc4 [INLONG-5117][SortStandalone] Support specify component type 
from remote config (#5134)
206a7adc4 is described below

commit 206a7adc4ceda0edfa40a2dcb8483fc8134aeb82
Author: vernedeng <[email protected]>
AuthorDate: Wed Jul 20 14:11:14 2022 +0800

    [INLONG-5117][SortStandalone] Support specify component type from remote 
config (#5134)
---
 .../standalone/utils/FlumeConfigGenerator.java     | 60 +++++++++++++++-------
 1 file changed, 42 insertions(+), 18 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
index f1ac2a419..2981d1982 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
@@ -22,6 +22,7 @@ import 
org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * generator for flume config
@@ -32,43 +33,48 @@ public class FlumeConfigGenerator {
     public static final String KEY_SORT_CHANNEL_TYPE = "sortChannel.type";
     public static final String KEY_SORT_SINK_TYPE = "sortSink.type";
     public static final String KEY_SORT_SOURCE_TYPE = "sortSource.type";
+    public static final String KEY_SDK_START_TIME = "sortSdk.startTime";
+    public static final String KEY_SDK_STOP_TIME = "sortSdk.stopTime";
 
     public static Map<String, String> 
generateFlumeConfiguration(SortTaskConfig taskConfig) {
         Map<String, String> flumeConf = new HashMap<>();
         String name = taskConfig.getName();
         Map<String, String> sinkParams = taskConfig.getSinkParams();
         // channels
-        appendChannels(flumeConf, name);
+        appendChannels(flumeConf, name, sinkParams);
         // sinks
         appendSinks(flumeConf, name, sinkParams);
         // sources
-        appendSources(flumeConf, name);
+        appendSources(flumeConf, name, sinkParams);
         return flumeConf;
     }
 
     /**
-     * appendChannels
+     * append channels config
      *
-     * @param flumeConf
+     * @param flumeConf final config of flume
+     * @param name sort task name
+     * @param sinkParams sink params of this task
      */
-    private static void appendChannels(Map<String, String> flumeConf, String 
name) {
+    private static void appendChannels(Map<String, String> flumeConf, String 
name, Map<String, String> sinkParams) {
         StringBuilder builder = new StringBuilder();
         String channelName = name + "Channel";
         flumeConf.put(name + ".channels", channelName);
         String prefix = 
builder.append(name).append(".channels.").append(channelName).append(".").toString();
         builder.setLength(0);
         String channelType = builder.append(prefix).append("type").toString();
-        String channelClass = 
CommonPropertiesHolder.getString(KEY_SORT_CHANNEL_TYPE);
+        String channelClass = sinkParams.getOrDefault(KEY_SORT_CHANNEL_TYPE,
+                CommonPropertiesHolder.getString(KEY_SORT_CHANNEL_TYPE));
         flumeConf.put(channelType, channelClass);
         appendCommon(flumeConf, prefix, null, name);
     }
 
     /**
-     * appendCommon
+     * appendCommon config
      *
-     * @param flumeConf
-     * @param prefix
-     * @param componentParams
+     * @param flumeConf final config of flume
+     * @param prefix prefix of common properties
+     * @param componentParams common properties
      */
     private static void appendCommon(
             Map<String, String> flumeConf,
@@ -95,9 +101,11 @@ public class FlumeConfigGenerator {
     }
 
     /**
-     * appendSinks
+     * append sink config
      *
-     * @param flumeConf
+     * @param flumeConf final config of flume
+     * @param name sort task name
+     * @param sinkParams sink params of this task
      */
     private static void appendSinks(Map<String, String> flumeConf, String 
name, Map<String, String> sinkParams) {
         // sinks
@@ -108,7 +116,8 @@ public class FlumeConfigGenerator {
         // type
         builder.setLength(0);
         String sinkType = builder.append(prefix).append("type").toString();
-        String sinkClass = 
CommonPropertiesHolder.getString(KEY_SORT_SINK_TYPE);
+        String sinkClass = sinkParams.getOrDefault(KEY_SORT_SINK_TYPE,
+                CommonPropertiesHolder.getString(KEY_SORT_SINK_TYPE));
         flumeConf.put(sinkType, sinkClass);
         // channel
         builder.setLength(0);
@@ -120,11 +129,16 @@ public class FlumeConfigGenerator {
     }
 
     /**
-     * appendSources
+     * append source config
      *
-     * @param flumeConf
+     * @param flumeConf final config of flume
+     * @param name sort task name
+     * @param sinkParams sink params of this task
      */
-    private static void appendSources(Map<String, String> flumeConf, String 
name) {
+    private static void appendSources(
+            Map<String, String> flumeConf,
+            String name, Map<String,
+            String> sinkParams) {
         // sources
         String sourceName = name + "Source";
         flumeConf.put(name + ".sources", sourceName);
@@ -133,7 +147,8 @@ public class FlumeConfigGenerator {
         // type
         builder.setLength(0);
         String sourceType = builder.append(prefix).append("type").toString();
-        String sourceClass = 
CommonPropertiesHolder.getString(KEY_SORT_SOURCE_TYPE);
+        String sourceClass = sinkParams.getOrDefault(KEY_SORT_SOURCE_TYPE,
+                CommonPropertiesHolder.getString(KEY_SORT_SOURCE_TYPE));
         flumeConf.put(sourceType, sourceClass);
         // channel
         builder.setLength(0);
@@ -144,7 +159,16 @@ public class FlumeConfigGenerator {
         builder.setLength(0);
         String selectorTypeKey = 
builder.append(prefix).append("selector.type").toString();
         flumeConf.put(selectorTypeKey, 
"org.apache.flume.channel.ReplicatingChannelSelector");
-        //
+        // valid msg time interval
+        builder.setLength(0);
+        String startTimeKey = 
builder.append(prefix).append(KEY_SDK_START_TIME).toString();
+        Optional.ofNullable(sinkParams.get(KEY_SDK_START_TIME))
+                .map(startTime -> flumeConf.put(startTimeKey, startTime));
+        builder.setLength(0);
+        String stopTimeKey = 
builder.append(prefix).append(KEY_SDK_STOP_TIME).toString();
+        Optional.ofNullable(sinkParams.get(KEY_SDK_STOP_TIME))
+                .map(stopTime -> flumeConf.put(stopTimeKey, stopTime));
+
         appendCommon(flumeConf, prefix, null, name);
     }
 }

Reply via email to