This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3aeb0c8d14 [INLONG-10577][Sort] Simplified SortStandalone
SortSdkSource (#10578)
3aeb0c8d14 is described below
commit 3aeb0c8d14ef8fe5a596f899a48dbab919996e63
Author: vernedeng <[email protected]>
AuthorDate: Tue Jul 9 15:45:49 2024 +0800
[INLONG-10577][Sort] Simplified SortStandalone SortSdkSource (#10578)
---
.../standalone/source/sortsdk/SortSdkSource.java | 20 +-
.../source/sortsdk/v2/SortSdkSource.java | 205 ---------------------
.../source/sortsdk/TestSortSdkSource.java | 5 +-
3 files changed, 8 insertions(+), 222 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index 199a583b9a..ac4726f241 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -17,7 +17,6 @@
package org.apache.inlong.sort.standalone.source.sortsdk;
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sdk.commons.admin.AdminServiceRegister;
import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
import org.apache.inlong.sdk.sort.api.SortClient;
@@ -26,11 +25,9 @@ import org.apache.inlong.sdk.sort.api.SortClientFactory;
import org.apache.inlong.sort.standalone.admin.ConsumerServiceMBean;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.ManagerUrlHandler;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
import org.apache.inlong.sort.standalone.config.holder.SortSourceConfigType;
import
org.apache.inlong.sort.standalone.config.loader.ClassResourceQueryConsumeConfig;
-import org.apache.inlong.sort.standalone.utils.FlumeConfigGenerator;
import org.apache.commons.lang3.ClassUtils;
import org.apache.flume.Context;
@@ -87,6 +84,7 @@ public final class SortSdkSource extends AbstractSource
private static final SortClientConfig.ConsumeStrategy defaultStrategy =
SortClientConfig.ConsumeStrategy.lastest;
private static final String KEY_SORT_SDK_CLIENT_NUM = "sortSdkClientNum";
+ private static final String KEY_TASK_NAME = "taskName";
private static final int DEFAULT_SORT_SDK_CLIENT_NUM = 1;
private String taskName;
@@ -149,9 +147,9 @@ public final class SortSdkSource extends AbstractSource
*/
@Override
public void configure(Context context) {
- this.taskName = context.getString(FlumeConfigGenerator.KEY_TASK_NAME);
+ this.taskName = context.getString(KEY_TASK_NAME);
this.context = new SortSdkSourceContext(getName(), context);
- this.sortClusterName =
SortClusterConfigHolder.getClusterConfig().getClusterName();
+ this.sortClusterName = CommonPropertiesHolder.getClusterId();
this.reloadInterval = this.context.getReloadInterval();
this.initReloadExecutor();
// register
@@ -235,18 +233,8 @@ public final class SortSdkSource extends AbstractSource
* @return Map
*/
private Map<String, String> getSortClientConfigParameters() {
- Map<String, String> sortSdkParams = new HashMap<>();
Map<String, String> commonParams =
CommonPropertiesHolder.getContext().getSubProperties(SORT_SDK_PREFIX);
- sortSdkParams.putAll(commonParams);
- SortTaskConfig taskConfig =
SortClusterConfigHolder.getTaskConfig(taskName);
- if (taskConfig != null) {
- Map<String, String> sinkParams = taskConfig.getSinkParams();
- if (sinkParams != null) {
- Context sinkContext = new Context(sinkParams);
-
sortSdkParams.putAll(sinkContext.getSubProperties(SORT_SDK_PREFIX));
- }
- }
- return sortSdkParams;
+ return new HashMap<>(commonParams);
}
/**
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/v2/SortSdkSource.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/v2/SortSdkSource.java
deleted file mode 100644
index 43245da9d0..0000000000
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/v2/SortSdkSource.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.standalone.source.sortsdk.v2;
-
-import org.apache.inlong.sdk.commons.admin.AdminServiceRegister;
-import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
-import org.apache.inlong.sdk.sort.api.SortClient;
-import org.apache.inlong.sdk.sort.api.SortClientConfig;
-import org.apache.inlong.sdk.sort.api.SortClientFactory;
-import org.apache.inlong.sort.standalone.admin.ConsumerServiceMBean;
-import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.holder.ManagerUrlHandler;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
-import org.apache.inlong.sort.standalone.config.holder.SortSourceConfigType;
-import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
-import
org.apache.inlong.sort.standalone.config.loader.ClassResourceQueryConsumeConfig;
-import
org.apache.inlong.sort.standalone.source.sortsdk.DefaultTopicChangeListener;
-import org.apache.inlong.sort.standalone.source.sortsdk.FetchCallback;
-import org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSourceContext;
-import org.apache.inlong.sort.standalone.utils.v2.FlumeConfigGenerator;
-
-import org.apache.commons.lang3.ClassUtils;
-import org.apache.flume.Context;
-import org.apache.flume.EventDrivenSource;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.source.AbstractSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Default Source implementation of InLong.
- *
- * <p>
- * SortSdkSource acquired msg from different upstream data store by register
{@link SortClient} for each sort task. The
- * only things SortSdkSource should do is to get one client by the sort task
id, or remove one client when the task is
- * finished or schedule to other source instance.
- * </p>
- *
- * <p>
- * The Default Manager of InLong will schedule the partition and topic
automatically.
- * </p>
- *
- * <p>
- * Because all sources should implement {@link Configurable}, the
SortSdkSource should have default constructor
- * <b>WITHOUT</b> any arguments, and parameters will be configured by {@link
Configurable#configure(Context)}.
- * </p>
- */
-public final class SortSdkSource extends AbstractSource
- implements
- Configurable,
- Runnable,
- EventDrivenSource,
- ConsumerServiceMBean {
-
- private static final Logger LOG =
LoggerFactory.getLogger(SortSdkSource.class);
- public static final String SORT_SDK_PREFIX = "sortsdk.";
- private static final int CORE_POOL_SIZE = 1;
- private static final SortClientConfig.ConsumeStrategy defaultStrategy =
SortClientConfig.ConsumeStrategy.lastest;
- private static final String KEY_SORT_SDK_CLIENT_NUM = "sortSdkClientNum";
- private static final int DEFAULT_SORT_SDK_CLIENT_NUM = 1;
- private String taskName;
- private SortSdkSourceContext context;
- private String sortClusterName;
- private long reloadInterval;
- private ScheduledExecutorService pool;
-
- private List<SortClient> sortClients = new ArrayList<>();
-
- @Override
- public synchronized void start() {
- int sortSdkClientNum =
CommonPropertiesHolder.getInteger(KEY_SORT_SDK_CLIENT_NUM,
DEFAULT_SORT_SDK_CLIENT_NUM);
- LOG.info("start SortSdkSource:{}, client num is {}", taskName,
sortSdkClientNum);
- for (int i = 0; i < sortSdkClientNum; i++) {
- SortClient client = this.newClient(taskName);
- if (client != null) {
- this.sortClients.add(client);
- }
- }
- }
-
- @Override
- public void stop() {
- pool.shutdownNow();
- LOG.info("close sort client {}.", taskName);
- for (SortClient sortClient : sortClients) {
- sortClient.getConfig().setStopConsume(true);
- sortClient.close();
- }
- }
-
- @Override
- public void run() {
- LOG.info("start to reload SortSdkSource:{}", taskName);
- for (SortClient sortClient : sortClients) {
-
sortClient.getConfig().setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl());
- }
- }
-
- @Override
- public void configure(Context context) {
- this.taskName = context.getString(FlumeConfigGenerator.KEY_TASK_NAME);
- this.context = new SortSdkSourceContext(getName(), context);
- this.sortClusterName =
SortConfigHolder.getSortConfig().getSortClusterName();
- this.reloadInterval = this.context.getReloadInterval();
- this.initReloadExecutor();
- // register
- AdminServiceRegister.register(ConsumerServiceMBean.MBEAN_TYPE,
taskName, this);
- }
-
- private void initReloadExecutor() {
- this.pool = Executors.newScheduledThreadPool(CORE_POOL_SIZE);
- pool.scheduleAtFixedRate(this, reloadInterval, reloadInterval,
TimeUnit.SECONDS);
- }
-
- private SortClient newClient(final String sortTaskName) {
- LOG.info("start a new sort client for task: {}", sortTaskName);
- try {
- final SortClientConfig clientConfig = new
SortClientConfig(sortTaskName, this.sortClusterName,
- new DefaultTopicChangeListener(),
- SortSdkSource.defaultStrategy,
InetAddress.getLocalHost().getHostAddress());
- final FetchCallback callback =
FetchCallback.Factory.create(sortTaskName, getChannelProcessor(), context);
- clientConfig.setCallback(callback);
- Map<String, String> sortSdkParams =
this.getSortClientConfigParameters();
- clientConfig.setParameters(sortSdkParams);
-
- // create SortClient
- String configType = CommonPropertiesHolder
- .getString(SortSourceConfigType.KEY_TYPE,
SortSourceConfigType.MANAGER.name());
- SortClient client = null;
- if
(SortClusterConfigType.FILE.name().equalsIgnoreCase(configType)) {
- LOG.info("create sort sdk client in file way:{}", configType);
- ClassResourceQueryConsumeConfig queryConfig = new
ClassResourceQueryConsumeConfig();
- client = SortClientFactory.createSortClient(clientConfig,
queryConfig);
- } else if
(SortClusterConfigType.MANAGER.name().equalsIgnoreCase(configType)) {
- LOG.info("create sort sdk client in manager way:{}",
configType);
-
clientConfig.setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl());
- client = SortClientFactory.createSortClient(clientConfig);
- } else {
- LOG.info("create sort sdk client in custom way:{}",
configType);
- Class<?> loaderClass = ClassUtils.getClass(configType);
- Object loaderObject =
loaderClass.getDeclaredConstructor().newInstance();
- if (loaderObject instanceof Configurable) {
- ((Configurable) loaderObject).configure(new
Context(CommonPropertiesHolder.get()));
- }
- if (!(loaderObject instanceof QueryConsumeConfig)) {
- LOG.error("got exception when create QueryConsumeConfig
instance, config key:{},config class:{}",
- SortSourceConfigType.KEY_TYPE, configType);
- return null;
- }
- // if it specifies the type of QueryConsumeConfig.
- client = SortClientFactory.createSortClient(clientConfig,
(QueryConsumeConfig) loaderObject);
- }
-
- client.init();
- callback.setClient(client);
- return client;
- } catch (Throwable th) {
- LOG.error("got one throwable when init client of id:{}",
sortTaskName, th);
- }
- return null;
- }
-
- private Map<String, String> getSortClientConfigParameters() {
- Map<String, String> commonParams =
CommonPropertiesHolder.getContext().getSubProperties(SORT_SDK_PREFIX);
- return new HashMap<>(commonParams);
- }
-
- @Override
- public void stopConsumer() {
- for (SortClient sortClient : sortClients) {
- sortClient.getConfig().setStopConsume(true);
- }
- }
-
- @Override
- public void recoverConsumer() {
- for (SortClient sortClient : sortClients) {
- sortClient.getConfig().setStopConsume(false);
- }
- }
-}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/source/sortsdk/TestSortSdkSource.java
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/source/sortsdk/TestSortSdkSource.java
index fcac01fc15..a8a972b2e6 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/source/sortsdk/TestSortSdkSource.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/source/sortsdk/TestSortSdkSource.java
@@ -22,6 +22,7 @@ import
org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sdk.commons.admin.AdminServiceRegister;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.apache.flume.Context;
import org.junit.Assert;
@@ -45,7 +46,7 @@ import static org.mockito.ArgumentMatchers.anyString;
@RunWith(PowerMockRunner.class)
@PowerMockIgnore("javax.management.*")
@PrepareForTest({SortClusterConfigHolder.class, LoggerFactory.class,
Logger.class, MetricRegister.class,
- AdminServiceRegister.class})
+ AdminServiceRegister.class, InlongLoggerFactory.class})
public class TestSortSdkSource {
private Context mockContext;
@@ -55,6 +56,8 @@ public class TestSortSdkSource {
PowerMockito.mockStatic(LoggerFactory.class);
Logger log = PowerMockito.mock(Logger.class);
PowerMockito.when(LoggerFactory.getLogger(Mockito.any(Class.class))).thenReturn(log);
+ PowerMockito.mockStatic(InlongLoggerFactory.class);
+
PowerMockito.when(InlongLoggerFactory.getLogger(Mockito.any(Class.class))).thenReturn(log);
PowerMockito.mockStatic(MetricRegister.class);
PowerMockito.mockStatic(SortClusterConfigHolder.class);