This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3aeb0c8d14 [INLONG-10577][Sort] Simplified SortStandalone 
SortSdkSource (#10578)
3aeb0c8d14 is described below

commit 3aeb0c8d14ef8fe5a596f899a48dbab919996e63
Author: vernedeng <[email protected]>
AuthorDate: Tue Jul 9 15:45:49 2024 +0800

    [INLONG-10577][Sort] Simplified SortStandalone SortSdkSource (#10578)
---
 .../standalone/source/sortsdk/SortSdkSource.java   |  20 +-
 .../source/sortsdk/v2/SortSdkSource.java           | 205 ---------------------
 .../source/sortsdk/TestSortSdkSource.java          |   5 +-
 3 files changed, 8 insertions(+), 222 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index 199a583b9a..ac4726f241 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.sort.standalone.source.sortsdk;
 
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.sdk.commons.admin.AdminServiceRegister;
 import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
 import org.apache.inlong.sdk.sort.api.SortClient;
@@ -26,11 +25,9 @@ import org.apache.inlong.sdk.sort.api.SortClientFactory;
 import org.apache.inlong.sort.standalone.admin.ConsumerServiceMBean;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.ManagerUrlHandler;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
 import org.apache.inlong.sort.standalone.config.holder.SortSourceConfigType;
 import 
org.apache.inlong.sort.standalone.config.loader.ClassResourceQueryConsumeConfig;
-import org.apache.inlong.sort.standalone.utils.FlumeConfigGenerator;
 
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.flume.Context;
@@ -87,6 +84,7 @@ public final class SortSdkSource extends AbstractSource
     private static final SortClientConfig.ConsumeStrategy defaultStrategy = 
SortClientConfig.ConsumeStrategy.lastest;
 
     private static final String KEY_SORT_SDK_CLIENT_NUM = "sortSdkClientNum";
+    private static final String KEY_TASK_NAME = "taskName";
     private static final int DEFAULT_SORT_SDK_CLIENT_NUM = 1;
 
     private String taskName;
@@ -149,9 +147,9 @@ public final class SortSdkSource extends AbstractSource
      */
     @Override
     public void configure(Context context) {
-        this.taskName = context.getString(FlumeConfigGenerator.KEY_TASK_NAME);
+        this.taskName = context.getString(KEY_TASK_NAME);
         this.context = new SortSdkSourceContext(getName(), context);
-        this.sortClusterName = 
SortClusterConfigHolder.getClusterConfig().getClusterName();
+        this.sortClusterName = CommonPropertiesHolder.getClusterId();
         this.reloadInterval = this.context.getReloadInterval();
         this.initReloadExecutor();
         // register
@@ -235,18 +233,8 @@ public final class SortSdkSource extends AbstractSource
      * @return Map
      */
     private Map<String, String> getSortClientConfigParameters() {
-        Map<String, String> sortSdkParams = new HashMap<>();
         Map<String, String> commonParams = 
CommonPropertiesHolder.getContext().getSubProperties(SORT_SDK_PREFIX);
-        sortSdkParams.putAll(commonParams);
-        SortTaskConfig taskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
-        if (taskConfig != null) {
-            Map<String, String> sinkParams = taskConfig.getSinkParams();
-            if (sinkParams != null) {
-                Context sinkContext = new Context(sinkParams);
-                
sortSdkParams.putAll(sinkContext.getSubProperties(SORT_SDK_PREFIX));
-            }
-        }
-        return sortSdkParams;
+        return new HashMap<>(commonParams);
     }
 
     /**
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/v2/SortSdkSource.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/v2/SortSdkSource.java
deleted file mode 100644
index 43245da9d0..0000000000
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/v2/SortSdkSource.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.standalone.source.sortsdk.v2;
-
-import org.apache.inlong.sdk.commons.admin.AdminServiceRegister;
-import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
-import org.apache.inlong.sdk.sort.api.SortClient;
-import org.apache.inlong.sdk.sort.api.SortClientConfig;
-import org.apache.inlong.sdk.sort.api.SortClientFactory;
-import org.apache.inlong.sort.standalone.admin.ConsumerServiceMBean;
-import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.holder.ManagerUrlHandler;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
-import org.apache.inlong.sort.standalone.config.holder.SortSourceConfigType;
-import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
-import 
org.apache.inlong.sort.standalone.config.loader.ClassResourceQueryConsumeConfig;
-import 
org.apache.inlong.sort.standalone.source.sortsdk.DefaultTopicChangeListener;
-import org.apache.inlong.sort.standalone.source.sortsdk.FetchCallback;
-import org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSourceContext;
-import org.apache.inlong.sort.standalone.utils.v2.FlumeConfigGenerator;
-
-import org.apache.commons.lang3.ClassUtils;
-import org.apache.flume.Context;
-import org.apache.flume.EventDrivenSource;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.source.AbstractSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Default Source implementation of InLong.
- *
- * <p>
- * SortSdkSource acquired msg from different upstream data store by register 
{@link SortClient} for each sort task. The
- * only things SortSdkSource should do is to get one client by the sort task 
id, or remove one client when the task is
- * finished or schedule to other source instance.
- * </p>
- *
- * <p>
- * The Default Manager of InLong will schedule the partition and topic 
automatically.
- * </p>
- *
- * <p>
- * Because all sources should implement {@link Configurable}, the 
SortSdkSource should have default constructor
- * <b>WITHOUT</b> any arguments, and parameters will be configured by {@link 
Configurable#configure(Context)}.
- * </p>
- */
-public final class SortSdkSource extends AbstractSource
-        implements
-            Configurable,
-            Runnable,
-            EventDrivenSource,
-            ConsumerServiceMBean {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(SortSdkSource.class);
-    public static final String SORT_SDK_PREFIX = "sortsdk.";
-    private static final int CORE_POOL_SIZE = 1;
-    private static final SortClientConfig.ConsumeStrategy defaultStrategy = 
SortClientConfig.ConsumeStrategy.lastest;
-    private static final String KEY_SORT_SDK_CLIENT_NUM = "sortSdkClientNum";
-    private static final int DEFAULT_SORT_SDK_CLIENT_NUM = 1;
-    private String taskName;
-    private SortSdkSourceContext context;
-    private String sortClusterName;
-    private long reloadInterval;
-    private ScheduledExecutorService pool;
-
-    private List<SortClient> sortClients = new ArrayList<>();
-
-    @Override
-    public synchronized void start() {
-        int sortSdkClientNum = 
CommonPropertiesHolder.getInteger(KEY_SORT_SDK_CLIENT_NUM, 
DEFAULT_SORT_SDK_CLIENT_NUM);
-        LOG.info("start SortSdkSource:{}, client num is {}", taskName, 
sortSdkClientNum);
-        for (int i = 0; i < sortSdkClientNum; i++) {
-            SortClient client = this.newClient(taskName);
-            if (client != null) {
-                this.sortClients.add(client);
-            }
-        }
-    }
-
-    @Override
-    public void stop() {
-        pool.shutdownNow();
-        LOG.info("close sort client {}.", taskName);
-        for (SortClient sortClient : sortClients) {
-            sortClient.getConfig().setStopConsume(true);
-            sortClient.close();
-        }
-    }
-
-    @Override
-    public void run() {
-        LOG.info("start to reload SortSdkSource:{}", taskName);
-        for (SortClient sortClient : sortClients) {
-            
sortClient.getConfig().setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl());
-        }
-    }
-
-    @Override
-    public void configure(Context context) {
-        this.taskName = context.getString(FlumeConfigGenerator.KEY_TASK_NAME);
-        this.context = new SortSdkSourceContext(getName(), context);
-        this.sortClusterName = 
SortConfigHolder.getSortConfig().getSortClusterName();
-        this.reloadInterval = this.context.getReloadInterval();
-        this.initReloadExecutor();
-        // register
-        AdminServiceRegister.register(ConsumerServiceMBean.MBEAN_TYPE, 
taskName, this);
-    }
-
-    private void initReloadExecutor() {
-        this.pool = Executors.newScheduledThreadPool(CORE_POOL_SIZE);
-        pool.scheduleAtFixedRate(this, reloadInterval, reloadInterval, 
TimeUnit.SECONDS);
-    }
-
-    private SortClient newClient(final String sortTaskName) {
-        LOG.info("start a new sort client for task: {}", sortTaskName);
-        try {
-            final SortClientConfig clientConfig = new 
SortClientConfig(sortTaskName, this.sortClusterName,
-                    new DefaultTopicChangeListener(),
-                    SortSdkSource.defaultStrategy, 
InetAddress.getLocalHost().getHostAddress());
-            final FetchCallback callback = 
FetchCallback.Factory.create(sortTaskName, getChannelProcessor(), context);
-            clientConfig.setCallback(callback);
-            Map<String, String> sortSdkParams = 
this.getSortClientConfigParameters();
-            clientConfig.setParameters(sortSdkParams);
-
-            // create SortClient
-            String configType = CommonPropertiesHolder
-                    .getString(SortSourceConfigType.KEY_TYPE, 
SortSourceConfigType.MANAGER.name());
-            SortClient client = null;
-            if 
(SortClusterConfigType.FILE.name().equalsIgnoreCase(configType)) {
-                LOG.info("create sort sdk client in file way:{}", configType);
-                ClassResourceQueryConsumeConfig queryConfig = new 
ClassResourceQueryConsumeConfig();
-                client = SortClientFactory.createSortClient(clientConfig, 
queryConfig);
-            } else if 
(SortClusterConfigType.MANAGER.name().equalsIgnoreCase(configType)) {
-                LOG.info("create sort sdk client in manager way:{}", 
configType);
-                
clientConfig.setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl());
-                client = SortClientFactory.createSortClient(clientConfig);
-            } else {
-                LOG.info("create sort sdk client in custom way:{}", 
configType);
-                Class<?> loaderClass = ClassUtils.getClass(configType);
-                Object loaderObject = 
loaderClass.getDeclaredConstructor().newInstance();
-                if (loaderObject instanceof Configurable) {
-                    ((Configurable) loaderObject).configure(new 
Context(CommonPropertiesHolder.get()));
-                }
-                if (!(loaderObject instanceof QueryConsumeConfig)) {
-                    LOG.error("got exception when create QueryConsumeConfig 
instance, config key:{},config class:{}",
-                            SortSourceConfigType.KEY_TYPE, configType);
-                    return null;
-                }
-                // if it specifies the type of QueryConsumeConfig.
-                client = SortClientFactory.createSortClient(clientConfig, 
(QueryConsumeConfig) loaderObject);
-            }
-
-            client.init();
-            callback.setClient(client);
-            return client;
-        } catch (Throwable th) {
-            LOG.error("got one throwable when init client of id:{}", 
sortTaskName, th);
-        }
-        return null;
-    }
-
-    private Map<String, String> getSortClientConfigParameters() {
-        Map<String, String> commonParams = 
CommonPropertiesHolder.getContext().getSubProperties(SORT_SDK_PREFIX);
-        return new HashMap<>(commonParams);
-    }
-
-    @Override
-    public void stopConsumer() {
-        for (SortClient sortClient : sortClients) {
-            sortClient.getConfig().setStopConsume(true);
-        }
-    }
-
-    @Override
-    public void recoverConsumer() {
-        for (SortClient sortClient : sortClients) {
-            sortClient.getConfig().setStopConsume(false);
-        }
-    }
-}
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/source/sortsdk/TestSortSdkSource.java
 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/source/sortsdk/TestSortSdkSource.java
index fcac01fc15..a8a972b2e6 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/source/sortsdk/TestSortSdkSource.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/source/sortsdk/TestSortSdkSource.java
@@ -22,6 +22,7 @@ import 
org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
 import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.sdk.commons.admin.AdminServiceRegister;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
 import org.apache.flume.Context;
 import org.junit.Assert;
@@ -45,7 +46,7 @@ import static org.mockito.ArgumentMatchers.anyString;
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*")
 @PrepareForTest({SortClusterConfigHolder.class, LoggerFactory.class, 
Logger.class, MetricRegister.class,
-        AdminServiceRegister.class})
+        AdminServiceRegister.class, InlongLoggerFactory.class})
 public class TestSortSdkSource {
 
     private Context mockContext;
@@ -55,6 +56,8 @@ public class TestSortSdkSource {
         PowerMockito.mockStatic(LoggerFactory.class);
         Logger log = PowerMockito.mock(Logger.class);
         
PowerMockito.when(LoggerFactory.getLogger(Mockito.any(Class.class))).thenReturn(log);
+        PowerMockito.mockStatic(InlongLoggerFactory.class);
+        
PowerMockito.when(InlongLoggerFactory.getLogger(Mockito.any(Class.class))).thenReturn(log);
         PowerMockito.mockStatic(MetricRegister.class);
 
         PowerMockito.mockStatic(SortClusterConfigHolder.class);

Reply via email to