This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 206a7adc4 [INLONG-5117][SortStandalone] Support specify component type
from remote config (#5134)
206a7adc4 is described below
commit 206a7adc4ceda0edfa40a2dcb8483fc8134aeb82
Author: vernedeng <[email protected]>
AuthorDate: Wed Jul 20 14:11:14 2022 +0800
[INLONG-5117][SortStandalone] Support specify component type from remote
config (#5134)
---
.../standalone/utils/FlumeConfigGenerator.java | 60 +++++++++++++++-------
1 file changed, 42 insertions(+), 18 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
index f1ac2a419..2981d1982 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
@@ -22,6 +22,7 @@ import
org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
/**
* generator for flume config
@@ -32,43 +33,48 @@ public class FlumeConfigGenerator {
public static final String KEY_SORT_CHANNEL_TYPE = "sortChannel.type";
public static final String KEY_SORT_SINK_TYPE = "sortSink.type";
public static final String KEY_SORT_SOURCE_TYPE = "sortSource.type";
+ public static final String KEY_SDK_START_TIME = "sortSdk.startTime";
+ public static final String KEY_SDK_STOP_TIME = "sortSdk.stopTime";
public static Map<String, String>
generateFlumeConfiguration(SortTaskConfig taskConfig) {
Map<String, String> flumeConf = new HashMap<>();
String name = taskConfig.getName();
Map<String, String> sinkParams = taskConfig.getSinkParams();
// channels
- appendChannels(flumeConf, name);
+ appendChannels(flumeConf, name, sinkParams);
// sinks
appendSinks(flumeConf, name, sinkParams);
// sources
- appendSources(flumeConf, name);
+ appendSources(flumeConf, name, sinkParams);
return flumeConf;
}
/**
- * appendChannels
+ * append channels config
*
- * @param flumeConf
+ * @param flumeConf final config of flume
+ * @param name sort task name
+ * @param sinkParams sink params of this task
*/
- private static void appendChannels(Map<String, String> flumeConf, String
name) {
+ private static void appendChannels(Map<String, String> flumeConf, String
name, Map<String, String> sinkParams) {
StringBuilder builder = new StringBuilder();
String channelName = name + "Channel";
flumeConf.put(name + ".channels", channelName);
String prefix =
builder.append(name).append(".channels.").append(channelName).append(".").toString();
builder.setLength(0);
String channelType = builder.append(prefix).append("type").toString();
- String channelClass =
CommonPropertiesHolder.getString(KEY_SORT_CHANNEL_TYPE);
+ String channelClass = sinkParams.getOrDefault(KEY_SORT_CHANNEL_TYPE,
+ CommonPropertiesHolder.getString(KEY_SORT_CHANNEL_TYPE));
flumeConf.put(channelType, channelClass);
appendCommon(flumeConf, prefix, null, name);
}
/**
- * appendCommon
+ * appendCommon config
*
- * @param flumeConf
- * @param prefix
- * @param componentParams
+ * @param flumeConf final config of flume
+ * @param prefix prefix of common properties
+ * @param componentParams common properties
*/
private static void appendCommon(
Map<String, String> flumeConf,
@@ -95,9 +101,11 @@ public class FlumeConfigGenerator {
}
/**
- * appendSinks
+ * append sink config
*
- * @param flumeConf
+ * @param flumeConf final config of flume
+ * @param name sort task name
+ * @param sinkParams sink params of this task
*/
private static void appendSinks(Map<String, String> flumeConf, String
name, Map<String, String> sinkParams) {
// sinks
@@ -108,7 +116,8 @@ public class FlumeConfigGenerator {
// type
builder.setLength(0);
String sinkType = builder.append(prefix).append("type").toString();
- String sinkClass =
CommonPropertiesHolder.getString(KEY_SORT_SINK_TYPE);
+ String sinkClass = sinkParams.getOrDefault(KEY_SORT_SINK_TYPE,
+ CommonPropertiesHolder.getString(KEY_SORT_SINK_TYPE));
flumeConf.put(sinkType, sinkClass);
// channel
builder.setLength(0);
@@ -120,11 +129,16 @@ public class FlumeConfigGenerator {
}
/**
- * appendSources
+ * append source config
*
- * @param flumeConf
+ * @param flumeConf final config of flume
+ * @param name sort task name
+ * @param sinkParams sink params of this task
*/
- private static void appendSources(Map<String, String> flumeConf, String
name) {
+ private static void appendSources(
+ Map<String, String> flumeConf,
+ String name, Map<String,
+ String> sinkParams) {
// sources
String sourceName = name + "Source";
flumeConf.put(name + ".sources", sourceName);
@@ -133,7 +147,8 @@ public class FlumeConfigGenerator {
// type
builder.setLength(0);
String sourceType = builder.append(prefix).append("type").toString();
- String sourceClass =
CommonPropertiesHolder.getString(KEY_SORT_SOURCE_TYPE);
+ String sourceClass = sinkParams.getOrDefault(KEY_SORT_SOURCE_TYPE,
+ CommonPropertiesHolder.getString(KEY_SORT_SOURCE_TYPE));
flumeConf.put(sourceType, sourceClass);
// channel
builder.setLength(0);
@@ -144,7 +159,16 @@ public class FlumeConfigGenerator {
builder.setLength(0);
String selectorTypeKey =
builder.append(prefix).append("selector.type").toString();
flumeConf.put(selectorTypeKey,
"org.apache.flume.channel.ReplicatingChannelSelector");
- //
+ // valid msg time interval
+ builder.setLength(0);
+ String startTimeKey =
builder.append(prefix).append(KEY_SDK_START_TIME).toString();
+ Optional.ofNullable(sinkParams.get(KEY_SDK_START_TIME))
+ .map(startTime -> flumeConf.put(startTimeKey, startTime));
+ builder.setLength(0);
+ String stopTimeKey =
builder.append(prefix).append(KEY_SDK_STOP_TIME).toString();
+ Optional.ofNullable(sinkParams.get(KEY_SDK_STOP_TIME))
+ .map(stopTime -> flumeConf.put(stopTimeKey, stopTime));
+
appendCommon(flumeConf, prefix, null, name);
}
}