This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f3a021bc4b [INLONG-11188][Sort] HTTP SortStandalone supports batch 
sorting capability (#11189)
f3a021bc4b is described below

commit f3a021bc4ba636b70a8d52db746d3df637c680ef
Author: ChunLiang Lu <[email protected]>
AuthorDate: Tue Sep 24 09:52:30 2024 +0800

    [INLONG-11188][Sort] HTTP SortStandalone supports batch sorting capability 
(#11189)
---
 .../inlong/sort/standalone/utils/BufferQueue.java  |  11 ++
 .../sink/http/DefaultEvent2HttpRequestHandler.java |  15 ++-
 .../sort/standalone/sink/http/HttpCallback.java    |  74 ++++++++---
 .../standalone/sink/http/HttpChannelWorker.java    | 131 ++++++++++++------
 .../sort/standalone/sink/http/HttpIdConfig.java    |   2 +
 .../standalone/sink/http/HttpOutputChannel.java    | 146 ---------------------
 .../sort/standalone/sink/http/HttpRequest.java     |  31 ++++-
 .../inlong/sort/standalone/sink/http/HttpSink.java |  80 +++++++++--
 .../sort/standalone/sink/http/HttpSinkContext.java |  42 +++---
 .../sort/standalone/sink/http/HttpSinkFactory.java |   4 -
 .../sink/http/IEvent2HttpRequestHandler.java       |   5 +
 11 files changed, 299 insertions(+), 242 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/BufferQueue.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/BufferQueue.java
index f9c05d40f2..8ab31c76bc 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/BufferQueue.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/BufferQueue.java
@@ -41,6 +41,17 @@ public class BufferQueue<A> {
         this.currentTokens = new SizeSemaphore(maxSizeKb, SizeSemaphore.ONEKB);
     }
 
+    /**
+     * Constructor
+     * 
+     * @param maxSizeKb
+     * @param queue
+     */
+    public BufferQueue(int maxSizeKb, LinkedBlockingQueue<A> queue) {
+        this.currentTokens = new SizeSemaphore(maxSizeKb, SizeSemaphore.ONEKB);
+        this.queue = queue;
+    }
+
     /**
      * Constructor
      * 
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
index 43f1cd9761..0a3b90d924 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sort.standalone.sink.http;
 
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 import org.apache.inlong.sort.standalone.utils.UnescapeHelper;
 
@@ -36,6 +37,7 @@ import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URLEncoder;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -56,7 +58,6 @@ public class DefaultEvent2HttpRequestHandler implements 
IEvent2HttpRequestHandle
         String uid = event.getUid();
         HttpIdConfig idConfig = context.getIdConfig(uid);
         if (idConfig == null) {
-            context.addSendResultMetric(event, context.getTaskName(), false, 
System.currentTimeMillis());
             return null;
         }
         // get the delimiter
@@ -130,11 +131,21 @@ public class DefaultEvent2HttpRequestHandler implements 
IEvent2HttpRequestHandle
                 LOG.error("Unsupported request method: {}", requestMethod);
                 return null;
         }
-        return new HttpRequest(request, event, idConfig.getMaxRetryTimes());
+        return new HttpRequest(request, event, null, 
idConfig.getMaxRetryTimes());
     }
 
     private static void setEntity(HttpEntityEnclosingRequestBase request, 
String jsonData) {
         StringEntity requestEntity = new StringEntity(jsonData, 
ContentType.APPLICATION_JSON);
         request.setEntity(requestEntity);
     }
+
+    public List<HttpRequest> parse(HttpSinkContext context, DispatchProfile 
dispatchProfile)
+            throws URISyntaxException, JsonProcessingException {
+        List<HttpRequest> requests = new ArrayList<>();
+        for (ProfileEvent profileEvent : dispatchProfile.getEvents()) {
+            HttpRequest request = this.parse(context, profileEvent);
+            requests.add(request);
+        }
+        return requests;
+    }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpCallback.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpCallback.java
index 59ef12f5e0..7b65c59617 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpCallback.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpCallback.java
@@ -18,12 +18,18 @@
 package org.apache.inlong.sort.standalone.sink.http;
 
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
+import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
 import org.apache.http.concurrent.FutureCallback;
 import org.slf4j.Logger;
 
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
 public class HttpCallback implements FutureCallback<HttpResponse> {
 
     public static final Logger LOG = 
InlongLoggerFactory.getLogger(HttpCallback.class);
@@ -39,25 +45,47 @@ public class HttpCallback implements 
FutureCallback<HttpResponse> {
     @Override
     public void completed(HttpResponse httpResponse) {
         int statusCode = httpResponse.getStatusLine().getStatusCode();
-        ProfileEvent event = requestItem.getEvent();
         long sendTime = requestItem.getSendTime();
 
         // is fail
         if (statusCode != 200) {
-            handleFailedRequest(event, sendTime);
+            HttpEntity entity = httpResponse.getEntity();
+            try (InputStream is = entity.getContent()) {
+                int len = (int) entity.getContentLength();
+                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                byte[] readed = new byte[len];
+                is.read(readed);
+                baos.write(readed);
+                String content = new String(readed);
+                LOG.error("Fail to send http,statusCode:{},content:{}",
+                        statusCode, content);
+            } catch (Throwable t) {
+                LOG.error(t.getMessage(), t);
+            }
+            handleFailedRequest(sendTime);
         } else {
-            context.addSendResultMetric(event, context.getTaskName(), true, 
sendTime);
-            context.releaseDispatchQueue(requestItem);
-            event.ack();
+            if (requestItem.getProfileEvent() != null) {
+                ProfileEvent profileEvent = requestItem.getProfileEvent();
+                context.addSendResultMetric(profileEvent, 
context.getTaskName(), true, sendTime);
+                context.releaseDispatchQueue(profileEvent);
+                profileEvent.ack();
+            }
+            if (requestItem.getDispatchProfile() != null) {
+                DispatchProfile dispatchProfile = 
requestItem.getDispatchProfile();
+                for (ProfileEvent profileEvent : dispatchProfile.getEvents()) {
+                    context.addSendResultMetric(profileEvent, 
context.getTaskName(), true, sendTime);
+                    context.releaseDispatchQueue(profileEvent);
+                    profileEvent.ack();
+                }
+            }
         }
     }
 
     @Override
     public void failed(Exception e) {
         LOG.error("Http request failed,errorMsg:{}", e.getMessage(), e);
-        ProfileEvent event = requestItem.getEvent();
         long sendTime = requestItem.getSendTime();
-        handleFailedRequest(event, sendTime);
+        handleFailedRequest(sendTime);
     }
 
     @Override
@@ -65,16 +93,30 @@ public class HttpCallback implements 
FutureCallback<HttpResponse> {
         LOG.info("Request cancelled");
     }
 
-    private void handleFailedRequest(ProfileEvent event, long sendTime) {
+    private void handleFailedRequest(long sendTime) {
         int remainRetryTimes = requestItem.getRemainRetryTimes();
-        context.addSendResultMetric(event, context.getTaskName(), false, 
sendTime);
-        // if reach the max retry times, release the request
-        if (remainRetryTimes == 1) {
-            context.releaseDispatchQueue(requestItem);
-            return;
-        } else if (remainRetryTimes > 1) {
-            requestItem.setRemainRetryTimes(remainRetryTimes - 1);
+        if (requestItem.getProfileEvent() != null) {
+            ProfileEvent profileEvent = requestItem.getProfileEvent();
+            context.addSendResultMetric(profileEvent, context.getTaskName(), 
false, sendTime);
+            // if reach the max retry times, release the request
+            if (remainRetryTimes == 1) {
+                context.releaseDispatchQueue(profileEvent);
+                return;
+            } else if (remainRetryTimes > 1) {
+                requestItem.setRemainRetryTimes(remainRetryTimes - 1);
+            }
+            long dispatchTime = profileEvent.getRawLogTime() - 
profileEvent.getRawLogTime() % DispatchManager.MINUTE_MS;
+            DispatchProfile dispatchProfile = new 
DispatchProfile(profileEvent.getUid(),
+                    profileEvent.getInlongGroupId(), 
profileEvent.getInlongStreamId(),
+                    dispatchTime);
+            dispatchProfile.addEvent(profileEvent, 
DispatchManager.DEFAULT_DISPATCH_MAX_PACKCOUNT, Integer.MAX_VALUE);
+            context.backDispatchQueue(dispatchProfile);
+        }
+        if (requestItem.getDispatchProfile() != null) {
+            DispatchProfile dispatchProfile = requestItem.getDispatchProfile();
+            dispatchProfile.getEvents()
+                    .forEach(v -> context.addSendResultMetric(v, 
context.getTaskName(), false, sendTime));
+            context.backDispatchQueue(dispatchProfile);
         }
-        context.backDispatchQueue(requestItem);
     }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpChannelWorker.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpChannelWorker.java
index 673051cdab..ac42dbf3d6 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpChannelWorker.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpChannelWorker.java
@@ -18,14 +18,25 @@
 package org.apache.inlong.sort.standalone.sink.http;
 
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
 
-import org.apache.flume.Channel;
-import org.apache.flume.Event;
-import org.apache.flume.Transaction;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.List;
+
 public class HttpChannelWorker extends Thread {
 
     public static final Logger LOG = 
LoggerFactory.getLogger(HttpChannelWorker.class);
@@ -35,6 +46,7 @@ public class HttpChannelWorker extends Thread {
 
     private LifecycleState status;
     private IEvent2HttpRequestHandler handler;
+    private CloseableHttpAsyncClient httpClient;
 
     public HttpChannelWorker(HttpSinkContext context, int workerIndex) {
         this.context = context;
@@ -47,6 +59,7 @@ public class HttpChannelWorker extends Thread {
     public void run() {
         status = LifecycleState.START;
         LOG.info("Starting HttpChannelWorker:{},status:{},index:{}", 
context.getTaskName(), status, workerIndex);
+        this.initHttpClient();
         while (status == LifecycleState.START) {
             try {
                 this.doRun();
@@ -57,47 +70,91 @@ public class HttpChannelWorker extends Thread {
         }
     }
 
-    public void doRun() {
-        Channel channel = context.getChannel();
-        Transaction tx = channel.getTransaction();
-        tx.begin();
-        try {
-            Event event = channel.take();
-            if (event == null) {
-                tx.commit();
-                Thread.sleep(context.getProcessInterval());
-                return;
-            }
-            if (!(event instanceof ProfileEvent)) {
-                tx.commit();
-                this.context.addSendFailMetric();
-                Thread.sleep(context.getProcessInterval());
-                return;
-            }
-            // to profileEvent
-            ProfileEvent profileEvent = (ProfileEvent) event;
-            HttpRequest httpRequest = handler.parse(context, profileEvent);
-            // offer queue
-            if (httpRequest != null) {
-                context.offerDispatchQueue(httpRequest);
-            } else {
-                context.addSendFailMetric();
+    public void doRun() throws InterruptedException, JsonProcessingException, 
URISyntaxException {
+        DispatchProfile dispatchProfile = context.takeDispatchQueue();
+        if (dispatchProfile == null) {
+            Thread.sleep(context.getProcessInterval());
+            return;
+        }
+        // check id config
+        String uid = dispatchProfile.getUid();
+        if (context.getIdConfig(uid) == null) {
+            for (ProfileEvent profileEvent : dispatchProfile.getEvents()) {
+                context.addSendResultMetric(profileEvent, 
context.getTaskName(), false, System.currentTimeMillis());
                 profileEvent.ack();
             }
-            tx.commit();
-        } catch (Throwable t) {
-            LOG.error("Process event failed!{}", this.getName(), t);
-            try {
-                tx.rollback();
-            } catch (Throwable e) {
-                LOG.error("Channel take transaction rollback exception:{}", 
getName(), e);
+            return;
+        }
+        // send
+        try {
+            // parse request
+            List<HttpRequest> requests = handler.parse(context, 
dispatchProfile);
+            // check request
+            if (requests == null) {
+                for (ProfileEvent profileEvent : dispatchProfile.getEvents()) {
+                    context.addSendResultMetric(profileEvent, 
context.getTaskName(), false, System.currentTimeMillis());
+                    context.releaseDispatchQueue(dispatchProfile);
+                    profileEvent.ack();
+                }
+            }
+            for (HttpRequest request : requests) {
+                httpClient.execute(request.getRequest(), new 
HttpCallback(context, request));
+                for (ProfileEvent profileEvent : dispatchProfile.getEvents()) {
+                    context.addSendMetric(profileEvent, context.getTaskName());
+                }
             }
-        } finally {
-            tx.close();
+        } catch (Throwable e) {
+            LOG.error("Failed to send HttpRequest uid:{},error:{}", 
dispatchProfile.getUid(), e.getMessage(), e);
+            context.backDispatchQueue(dispatchProfile);
+            this.initHttpClient();
+            Thread.sleep(context.getProcessInterval());
         }
     }
 
     public void close() {
         this.status = LifecycleState.STOP;
     }
+
+    private void initHttpClient() {
+        if (httpClient != null) {
+            try {
+                httpClient.close();
+            } catch (IOException e) {
+                LOG.error(String.format("close HttpClient:%s", 
e.getMessage()), e);
+            }
+            httpClient = null;
+        }
+        try {
+            if (httpClient == null) {
+                String userName = context.getUsername();
+                String password = context.getPassword();
+                LOG.info("initHttpAsyncClient:url:{}", context.getBaseUrl());
+
+                HttpAsyncClientBuilder builder = HttpAsyncClients.custom();
+                final CredentialsProvider provider = new 
BasicCredentialsProvider();
+                if (context.getEnableCredential()) {
+                    provider.setCredentials(AuthScope.ANY,
+                            new UsernamePasswordCredentials(userName, 
password));
+                    builder.setDefaultCredentialsProvider(provider);
+                }
+
+                RequestConfig requestConfig = RequestConfig.custom()
+                        
.setConnectionRequestTimeout(context.getConnectionRequestTimeout())
+                        .setSocketTimeout(context.getSocketTimeout())
+                        .setMaxRedirects(context.getMaxRedirects())
+                        .setConnectTimeout(120 * 1000)
+                        .build();
+
+                builder.setDefaultRequestConfig(requestConfig)
+                        .setMaxConnTotal(context.getMaxConnect())
+                        .setMaxConnPerRoute(context.getMaxConnectPerRoute());
+
+                httpClient = HttpSinkFactory.createHttpAsyncClient(builder);
+                httpClient.start();
+            }
+        } catch (Exception e) {
+            LOG.error("init httpclient failed,error:{}", e.getMessage(), e);
+            httpClient = null;
+        }
+    }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
index 602ef1a793..e209b511b2 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
@@ -50,6 +50,7 @@ public class HttpIdConfig extends IdConfig {
     private List<String> fieldList;
     private Charset sourceCharset;
     private Charset sinkCharset;
+    private DataFlowConfig dataFlowConfig;
 
     public static HttpIdConfig create(DataFlowConfig dataFlowConfig) {
         HttpSinkConfig sinkConfig = (HttpSinkConfig) 
dataFlowConfig.getSinkConfig();
@@ -83,6 +84,7 @@ public class HttpIdConfig extends IdConfig {
                 .fieldList(fields)
                 .sinkCharset(sinkCharset)
                 .sourceCharset(sourceCharset)
+                .dataFlowConfig(dataFlowConfig)
                 .build();
     }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpOutputChannel.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpOutputChannel.java
deleted file mode 100644
index db7e8da923..0000000000
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpOutputChannel.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.standalone.sink.http;
-
-import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
-
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
-import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
-import org.apache.http.impl.nio.client.HttpAsyncClients;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-
-public class HttpOutputChannel extends Thread {
-
-    public static final Logger LOG = 
InlongLoggerFactory.getLogger(HttpOutputChannel.class);
-
-    private LifecycleState status;
-    private HttpSinkContext context;
-    private CloseableHttpAsyncClient httpClient;
-
-    public HttpOutputChannel(HttpSinkContext context) {
-        super(context.getTaskName());
-        this.context = context;
-        this.status = LifecycleState.IDLE;
-    }
-
-    public void init() {
-        initHttpClient();
-    }
-
-    private boolean initHttpClient() {
-        try {
-            if (httpClient == null) {
-                String userName = context.getUsername();
-                String password = context.getPassword();
-                LOG.info("initHttpAsyncClient:url:{}", context.getBaseUrl());
-
-                HttpAsyncClientBuilder builder = HttpAsyncClients.custom();
-                final CredentialsProvider provider = new 
BasicCredentialsProvider();
-                if (context.getEnableCredential()) {
-                    provider.setCredentials(AuthScope.ANY,
-                            new UsernamePasswordCredentials(userName, 
password));
-                    builder.setDefaultCredentialsProvider(provider);
-                }
-
-                RequestConfig requestConfig = RequestConfig.custom()
-                        
.setConnectionRequestTimeout(context.getConnectionRequestTimeout())
-                        .setSocketTimeout(context.getSocketTimeout())
-                        .setMaxRedirects(context.getMaxRedirects())
-                        .setConnectTimeout(120 * 1000)
-                        .build();
-
-                builder.setDefaultRequestConfig(requestConfig)
-                        .setMaxConnTotal(context.getMaxConnect())
-                        .setMaxConnPerRoute(context.getMaxConnectPerRoute());
-
-                httpClient = HttpSinkFactory.createHttpAsyncClient(builder);
-                httpClient.start();
-            }
-        } catch (Exception e) {
-            LOG.error("init httpclient failed.", e);
-            httpClient = null;
-            return false;
-        }
-        return true;
-    }
-
-    public void close() {
-        status = LifecycleState.STOP;
-        try {
-            httpClient.close();
-        } catch (IOException e) {
-            LOG.error(String.format("close HttpClient:%s", e.getMessage()), e);
-        }
-    }
-
-    @Override
-    public void run() {
-        status = LifecycleState.START;
-        LOG.info("Starting HttpOutputChannel:{},status:{}", 
context.getTaskName(), status);
-        while (status == LifecycleState.START) {
-            try {
-                send();
-            } catch (Throwable t) {
-                LOG.error("Error occurred while starting 
HttpOutputChannel:{},status:{}", context.getTaskName(), status,
-                        t);
-            }
-        }
-    }
-
-    public void send() throws InterruptedException {
-        HttpRequest httpRequest = null;
-        try {
-            // get httpRequest
-            httpRequest = context.takeDispatchQueue();
-            if (httpRequest == null) {
-                Thread.sleep(context.getProcessInterval());
-                return;
-            }
-            // get id config
-            String uid = httpRequest.getEvent().getUid();
-            if (context.getIdConfig(uid) == null) {
-                context.addSendResultMetric(httpRequest.getEvent(), 
context.getTaskName(), false,
-                        httpRequest.getSendTime());
-                return;
-            }
-            // send
-            httpClient.execute(httpRequest.getRequest(), new 
HttpCallback(context, httpRequest));
-            context.addSendMetric(httpRequest.getEvent(), 
context.getTaskName());
-        } catch (Throwable e) {
-            LOG.error("Failed to send HttpRequest '{}': {}", httpRequest, 
e.getMessage(), e);
-            if (httpRequest != null) {
-                context.backDispatchQueue(httpRequest);
-                context.addSendResultMetric(httpRequest.getEvent(), 
context.getTaskName(), false,
-                        httpRequest.getSendTime());
-            }
-            try {
-                Thread.sleep(context.getProcessInterval());
-            } catch (InterruptedException e1) {
-                LOG.error("Thread interrupted while sleeping, error: {}", 
e1.getMessage(), e1);
-            }
-        }
-    }
-}
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpRequest.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpRequest.java
index d1731a61b6..d5fd1b7179 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpRequest.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpRequest.java
@@ -18,19 +18,23 @@
 package org.apache.inlong.sort.standalone.sink.http;
 
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
 
 import org.apache.http.client.methods.HttpUriRequest;
 
 public class HttpRequest {
 
     private final HttpUriRequest request;
-    private final ProfileEvent event;
+    private final ProfileEvent profileEvent;
+    private final DispatchProfile dispatchProfile;
     private final long sendTime;
     private int remainRetryTimes;
 
-    public HttpRequest(HttpUriRequest request, ProfileEvent event, int 
remainRetryTimes) {
+    public HttpRequest(HttpUriRequest request, ProfileEvent profileEvent, 
DispatchProfile dispatchProfile,
+            int remainRetryTimes) {
         this.request = request;
-        this.event = event;
+        this.profileEvent = profileEvent;
+        this.dispatchProfile = dispatchProfile;
         this.sendTime = System.currentTimeMillis();
         this.remainRetryTimes = remainRetryTimes;
     }
@@ -39,10 +43,6 @@ public class HttpRequest {
         return request;
     }
 
-    public ProfileEvent getEvent() {
-        return event;
-    }
-
     public long getSendTime() {
         return sendTime;
     }
@@ -54,4 +54,21 @@ public class HttpRequest {
     public void setRemainRetryTimes(int remainRetryTimes) {
         this.remainRetryTimes = remainRetryTimes;
     }
+
+    /**
+     * get profileEvent
+     * @return the profileEvent
+     */
+    public ProfileEvent getProfileEvent() {
+        return profileEvent;
+    }
+
+    /**
+     * get dispatchProfile
+     * @return the dispatchProfile
+     */
+    public DispatchProfile getDispatchProfile() {
+        return dispatchProfile;
+    }
+
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSink.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSink.java
index 9da6fa0bff..90ba642019 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSink.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSink.java
@@ -17,11 +17,18 @@
 
 package org.apache.inlong.sort.standalone.sink.http;
 
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
 import org.apache.inlong.sort.standalone.sink.SinkContext;
 import org.apache.inlong.sort.standalone.utils.BufferQueue;
 
+import org.apache.flume.Channel;
 import org.apache.flume.Context;
+import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.sink.AbstractSink;
 import org.slf4j.Logger;
@@ -29,24 +36,43 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 public class HttpSink extends AbstractSink implements Configurable {
 
     public static final Logger LOG = LoggerFactory.getLogger(HttpSink.class);
 
     private Context parentContext;
-    private BufferQueue<HttpRequest> dispatchQueue;
+    // dispatch
+    private DispatchManager dispatchManager;
+    private BufferQueue<DispatchProfile> dispatchQueue;
+    private ScheduledExecutorService scheduledPool;
     private HttpSinkContext context;
     // workers
     private List<HttpChannelWorker> workers = new ArrayList<>();
-    // output
-    private HttpOutputChannel outputChannel;
 
     @Override
     public void start() {
         super.start();
         try {
-            this.dispatchQueue = SinkContext.createBufferQueue();
+            // dispatch
+            LinkedBlockingQueue<DispatchProfile> bufferQueue = new 
LinkedBlockingQueue<>();
+            int maxBufferQueueSizeKb = 
CommonPropertiesHolder.getInteger(SinkContext.KEY_MAX_BUFFERQUEUE_SIZE_KB,
+                    SinkContext.DEFAULT_MAX_BUFFERQUEUE_SIZE_KB);
+            this.dispatchQueue = new BufferQueue<>(maxBufferQueueSizeKb, 
bufferQueue);
+            this.dispatchManager = new DispatchManager(parentContext, 
bufferQueue);
+            this.scheduledPool = Executors.newSingleThreadScheduledExecutor();
+            this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+                public void run() {
+                    dispatchManager.setNeedOutputOvertimeData();
+                }
+            }, this.dispatchManager.getDispatchTimeout(), 
this.dispatchManager.getDispatchTimeout(),
+                    TimeUnit.MILLISECONDS);
+            // send queue
             this.context = new HttpSinkContext(getName(), parentContext, 
getChannel(), dispatchQueue);
             this.context.start();
             for (int i = 0; i < context.getMaxThreads(); i++) {
@@ -54,9 +80,6 @@ public class HttpSink extends AbstractSink implements 
Configurable {
                 this.workers.add(worker);
                 worker.start();
             }
-            this.outputChannel = 
HttpSinkFactory.createHttpOutputChannel(context);
-            this.outputChannel.init();
-            this.outputChannel.start();
         } catch (Exception e) {
             LOG.error("Failed to start HttpSink '{}': {}", this.getName(), 
e.getMessage());
         }
@@ -71,7 +94,6 @@ public class HttpSink extends AbstractSink implements 
Configurable {
                 worker.close();
             }
             this.workers.clear();
-            this.outputChannel.close();
         } catch (Exception e) {
             LOG.error("Failed to stop HttpSink '{}': {}", this.getName(), 
e.getMessage());
         }
@@ -85,6 +107,46 @@ public class HttpSink extends AbstractSink implements 
Configurable {
 
     @Override
     public Status process() throws EventDeliveryException {
-        return Status.BACKOFF;
+        dispatchManager.outputOvertimeData();
+        Channel channel = getChannel();
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        try {
+            Event event = channel.take();
+            if (event == null) {
+                tx.commit();
+                return Status.BACKOFF;
+            }
+            if (event instanceof ProfileEvent) {
+                ProfileEvent profileEvent = (ProfileEvent) event;
+                this.dispatchQueue.acquire(profileEvent.getBody().length);
+                this.dispatchManager.addEvent(profileEvent);
+                tx.commit();
+                return Status.READY;
+            } else if (event instanceof DispatchProfile) {
+                DispatchProfile dispatchProfile = (DispatchProfile) event;
+                for (ProfileEvent profileEvent : dispatchProfile.getEvents()) {
+                    this.dispatchQueue.acquire(profileEvent.getBody().length);
+                    this.dispatchManager.addEvent(profileEvent);
+                }
+                tx.commit();
+                return Status.READY;
+            } else {
+                LOG.error("event is not ProfileEvent or 
DispatchProfile,class:{}", event.getClass());
+                tx.commit();
+                this.context.addSendFailMetric();
+                return Status.READY;
+            }
+        } catch (Throwable t) {
+            LOG.error("Process event failed!" + this.getName(), t);
+            try {
+                tx.rollback();
+            } catch (Throwable e) {
+                LOG.error("Channel take transaction rollback exception:" + 
getName(), e);
+            }
+            return Status.BACKOFF;
+        } finally {
+            tx.close();
+        }
     }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
index 9228cf3957..adc1d30089 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
@@ -26,6 +26,7 @@ import 
org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
 import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
 import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
@@ -50,6 +51,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
+@SuppressWarnings("deprecation")
 public class HttpSinkContext extends SinkContext {
 
     public static final Logger LOG = 
InlongLoggerFactory.getLogger(HttpSinkContext.class);
@@ -79,7 +81,7 @@ public class HttpSinkContext extends SinkContext {
     private String nodeId;
     private Map<String, HttpIdConfig> idConfigMap = new ConcurrentHashMap<>();
     private ObjectMapper objectMapper = new ObjectMapper();
-    private final BufferQueue<HttpRequest> dispatchQueue;
+    private final BufferQueue<DispatchProfile> dispatchQueue;
     private AtomicLong offerCounter = new AtomicLong(0);
     private AtomicLong takeCounter = new AtomicLong(0);
     private AtomicLong backCounter = new AtomicLong(0);
@@ -96,7 +98,7 @@ public class HttpSinkContext extends SinkContext {
     private int logMaxLength = DEFAULT_LOG_MAX_LENGTH;
 
     public HttpSinkContext(String sinkName, Context context, Channel channel,
-            BufferQueue<HttpRequest> dispatchQueue) {
+            BufferQueue<DispatchProfile> dispatchQueue) {
         super(sinkName, context, channel);
         this.sinkContext = context;
         this.dispatchQueue = dispatchQueue;
@@ -190,8 +192,8 @@ public class HttpSinkContext extends SinkContext {
         this.maxConnect = httpNodeConfig.getMaxConnect();
 
         this.maxConnectPerRoute = 
sinkContext.getInteger(KEY_MAX_CONNECT_PER_ROUTE, 
DEFAULT_MAX_CONNECT_PER_ROUTE);
-        this.connectionRequestTimeout =
-                sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT, 
DEFAULT_CONNECTION_REQUEST_TIMEOUT);
+        this.connectionRequestTimeout = 
sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT,
+                DEFAULT_CONNECTION_REQUEST_TIMEOUT);
         this.socketTimeout = sinkContext.getInteger(KEY_SOCKET_TIMEOUT, 
DEFAULT_SOCKET_TIMEOUT);
         this.maxRedirects = sinkContext.getInteger(KEY_MAX_REDIRECTS, 
DEFAULT_MAX_REDIRECTS);
         this.logMaxLength = sinkContext.getInteger(KEY_LOG_MAX_LENGTH, 
DEFAULT_LOG_MAX_LENGTH);
@@ -206,8 +208,8 @@ public class HttpSinkContext extends SinkContext {
         this.maxConnect = sinkContext.getInteger(KEY_MAX_CONNECT_TOTAL, 
DEFAULT_MAX_CONNECT_TOTAL);
 
         this.maxConnectPerRoute = 
sinkContext.getInteger(KEY_MAX_CONNECT_PER_ROUTE, 
DEFAULT_MAX_CONNECT_PER_ROUTE);
-        this.connectionRequestTimeout =
-                sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT, 
DEFAULT_CONNECTION_REQUEST_TIMEOUT);
+        this.connectionRequestTimeout = 
sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT,
+                DEFAULT_CONNECTION_REQUEST_TIMEOUT);
         this.socketTimeout = sinkContext.getInteger(KEY_SOCKET_TIMEOUT, 
DEFAULT_SOCKET_TIMEOUT);
         this.maxRedirects = sinkContext.getInteger(KEY_MAX_REDIRECTS, 
DEFAULT_MAX_REDIRECTS);
         this.logMaxLength = sinkContext.getInteger(KEY_LOG_MAX_LENGTH, 
DEFAULT_LOG_MAX_LENGTH);
@@ -239,7 +241,7 @@ public class HttpSinkContext extends SinkContext {
         long auditFormatTime = msgTime - msgTime % 
CommonPropertiesHolder.getAuditFormatInterval();
         dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, 
String.valueOf(auditFormatTime));
         SortMetricItem metricItem = 
this.getMetricItemSet().findMetricItem(dimensions);
-        metricItem.readFailCount.incrementAndGet();
+        metricItem.sendFailCount.incrementAndGet();
     }
 
     public void addSendResultMetric(ProfileEvent currentRecord, String bid, 
boolean result, long sendTime) {
@@ -293,27 +295,25 @@ public class HttpSinkContext extends SinkContext {
         this.sinkContext = sinkContext;
     }
 
-    public void offerDispatchQueue(HttpRequest httpRequest) {
-        this.offerCounter.incrementAndGet();
-        dispatchQueue.acquire(httpRequest.getEvent().getBody().length);
-        dispatchQueue.offer(httpRequest);
-    }
-
-    public HttpRequest takeDispatchQueue() {
-        HttpRequest httpRequest = this.dispatchQueue.pollRecord();
-        if (httpRequest != null) {
+    public DispatchProfile takeDispatchQueue() {
+        DispatchProfile dispatchProfile = this.dispatchQueue.pollRecord();
+        if (dispatchProfile != null) {
             this.takeCounter.incrementAndGet();
         }
-        return httpRequest;
+        return dispatchProfile;
     }
 
-    public void backDispatchQueue(HttpRequest httpRequest) {
+    public void backDispatchQueue(DispatchProfile dispatchProfile) {
         this.backCounter.incrementAndGet();
-        dispatchQueue.offer(httpRequest);
+        dispatchQueue.offer(dispatchProfile);
+    }
+
+    public void releaseDispatchQueue(ProfileEvent event) {
+        dispatchQueue.release(event.getBody().length);
     }
 
-    public void releaseDispatchQueue(HttpRequest httpRequest) {
-        dispatchQueue.release(httpRequest.getEvent().getBody().length);
+    public void releaseDispatchQueue(DispatchProfile dispatchProfile) {
+        dispatchQueue.release(dispatchProfile.getSize());
     }
 
     public String getBaseUrl() {
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkFactory.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkFactory.java
index 6125297bae..a5a8d0f90f 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkFactory.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkFactory.java
@@ -22,10 +22,6 @@ import 
org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
 
 public class HttpSinkFactory {
 
-    public static HttpOutputChannel createHttpOutputChannel(HttpSinkContext 
context) {
-        return new HttpOutputChannel(context);
-    }
-
     public static CloseableHttpAsyncClient 
createHttpAsyncClient(HttpAsyncClientBuilder builder) {
         return builder.build();
     }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
index 65502fb50e..0268c40821 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
@@ -18,12 +18,17 @@
 package org.apache.inlong.sort.standalone.sink.http;
 
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 
 import java.net.URISyntaxException;
+import java.util.List;
 
 public interface IEvent2HttpRequestHandler {
 
     HttpRequest parse(HttpSinkContext context, ProfileEvent event) throws 
URISyntaxException, JsonProcessingException;
+
+    List<HttpRequest> parse(HttpSinkContext context, DispatchProfile 
dispatchProfile)
+            throws URISyntaxException, JsonProcessingException;
 }


Reply via email to