This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f3a021bc4b [INLONG-11188][Sort] HTTP SortStandalone supports batch
sorting capability (#11189)
f3a021bc4b is described below
commit f3a021bc4ba636b70a8d52db746d3df637c680ef
Author: ChunLiang Lu <[email protected]>
AuthorDate: Tue Sep 24 09:52:30 2024 +0800
[INLONG-11188][Sort] HTTP SortStandalone supports batch sorting capability
(#11189)
---
.../inlong/sort/standalone/utils/BufferQueue.java | 11 ++
.../sink/http/DefaultEvent2HttpRequestHandler.java | 15 ++-
.../sort/standalone/sink/http/HttpCallback.java | 74 ++++++++---
.../standalone/sink/http/HttpChannelWorker.java | 131 ++++++++++++------
.../sort/standalone/sink/http/HttpIdConfig.java | 2 +
.../standalone/sink/http/HttpOutputChannel.java | 146 ---------------------
.../sort/standalone/sink/http/HttpRequest.java | 31 ++++-
.../inlong/sort/standalone/sink/http/HttpSink.java | 80 +++++++++--
.../sort/standalone/sink/http/HttpSinkContext.java | 42 +++---
.../sort/standalone/sink/http/HttpSinkFactory.java | 4 -
.../sink/http/IEvent2HttpRequestHandler.java | 5 +
11 files changed, 299 insertions(+), 242 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/BufferQueue.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/BufferQueue.java
index f9c05d40f2..8ab31c76bc 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/BufferQueue.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/BufferQueue.java
@@ -41,6 +41,17 @@ public class BufferQueue<A> {
this.currentTokens = new SizeSemaphore(maxSizeKb, SizeSemaphore.ONEKB);
}
+ /**
+ * Constructor
+ *
+ * @param maxSizeKb
+ * @param queue
+ */
+ public BufferQueue(int maxSizeKb, LinkedBlockingQueue<A> queue) {
+ this.currentTokens = new SizeSemaphore(maxSizeKb, SizeSemaphore.ONEKB);
+ this.queue = queue;
+ }
+
/**
* Constructor
*
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
index 43f1cd9761..0a3b90d924 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.standalone.sink.http;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.apache.inlong.sort.standalone.utils.UnescapeHelper;
@@ -36,6 +37,7 @@ import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -56,7 +58,6 @@ public class DefaultEvent2HttpRequestHandler implements
IEvent2HttpRequestHandle
String uid = event.getUid();
HttpIdConfig idConfig = context.getIdConfig(uid);
if (idConfig == null) {
- context.addSendResultMetric(event, context.getTaskName(), false,
System.currentTimeMillis());
return null;
}
// get the delimiter
@@ -130,11 +131,21 @@ public class DefaultEvent2HttpRequestHandler implements
IEvent2HttpRequestHandle
LOG.error("Unsupported request method: {}", requestMethod);
return null;
}
- return new HttpRequest(request, event, idConfig.getMaxRetryTimes());
+ return new HttpRequest(request, event, null,
idConfig.getMaxRetryTimes());
}
private static void setEntity(HttpEntityEnclosingRequestBase request,
String jsonData) {
StringEntity requestEntity = new StringEntity(jsonData,
ContentType.APPLICATION_JSON);
request.setEntity(requestEntity);
}
+
+ public List<HttpRequest> parse(HttpSinkContext context, DispatchProfile
dispatchProfile)
+ throws URISyntaxException, JsonProcessingException {
+ List<HttpRequest> requests = new ArrayList<>();
+ for (ProfileEvent profileEvent : dispatchProfile.getEvents()) {
+ HttpRequest request = this.parse(context, profileEvent);
+ requests.add(request);
+ }
+ return requests;
+ }
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpCallback.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpCallback.java
index 59ef12f5e0..7b65c59617 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpCallback.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpCallback.java
@@ -18,12 +18,18 @@
package org.apache.inlong.sort.standalone.sink.http;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.concurrent.FutureCallback;
import org.slf4j.Logger;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
public class HttpCallback implements FutureCallback<HttpResponse> {
public static final Logger LOG =
InlongLoggerFactory.getLogger(HttpCallback.class);
@@ -39,25 +45,47 @@ public class HttpCallback implements
FutureCallback<HttpResponse> {
@Override
public void completed(HttpResponse httpResponse) {
int statusCode = httpResponse.getStatusLine().getStatusCode();
- ProfileEvent event = requestItem.getEvent();
long sendTime = requestItem.getSendTime();
// is fail
if (statusCode != 200) {
- handleFailedRequest(event, sendTime);
+ HttpEntity entity = httpResponse.getEntity();
+ try (InputStream is = entity.getContent()) {
+ int len = (int) entity.getContentLength();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte[] readed = new byte[len];
+ is.read(readed);
+ baos.write(readed);
+ String content = new String(readed);
+ LOG.error("Fail to send http,statusCode:{},content:{}",
+ statusCode, content);
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ }
+ handleFailedRequest(sendTime);
} else {
- context.addSendResultMetric(event, context.getTaskName(), true,
sendTime);
- context.releaseDispatchQueue(requestItem);
- event.ack();
+ if (requestItem.getProfileEvent() != null) {
+ ProfileEvent profileEvent = requestItem.getProfileEvent();
+ context.addSendResultMetric(profileEvent,
context.getTaskName(), true, sendTime);
+ context.releaseDispatchQueue(profileEvent);
+ profileEvent.ack();
+ }
+ if (requestItem.getDispatchProfile() != null) {
+ DispatchProfile dispatchProfile =
requestItem.getDispatchProfile();
+ for (ProfileEvent profileEvent : dispatchProfile.getEvents()) {
+ context.addSendResultMetric(profileEvent,
context.getTaskName(), true, sendTime);
+ context.releaseDispatchQueue(profileEvent);
+ profileEvent.ack();
+ }
+ }
}
}
@Override
public void failed(Exception e) {
LOG.error("Http request failed,errorMsg:{}", e.getMessage(), e);
- ProfileEvent event = requestItem.getEvent();
long sendTime = requestItem.getSendTime();
- handleFailedRequest(event, sendTime);
+ handleFailedRequest(sendTime);
}
@Override
@@ -65,16 +93,30 @@ public class HttpCallback implements
FutureCallback<HttpResponse> {
LOG.info("Request cancelled");
}
- private void handleFailedRequest(ProfileEvent event, long sendTime) {
+ private void handleFailedRequest(long sendTime) {
int remainRetryTimes = requestItem.getRemainRetryTimes();
- context.addSendResultMetric(event, context.getTaskName(), false,
sendTime);
- // if reach the max retry times, release the request
- if (remainRetryTimes == 1) {
- context.releaseDispatchQueue(requestItem);
- return;
- } else if (remainRetryTimes > 1) {
- requestItem.setRemainRetryTimes(remainRetryTimes - 1);
+ if (requestItem.getProfileEvent() != null) {
+ ProfileEvent profileEvent = requestItem.getProfileEvent();
+ context.addSendResultMetric(profileEvent, context.getTaskName(),
false, sendTime);
+ // if reach the max retry times, release the request
+ if (remainRetryTimes == 1) {
+ context.releaseDispatchQueue(profileEvent);
+ return;
+ } else if (remainRetryTimes > 1) {
+ requestItem.setRemainRetryTimes(remainRetryTimes - 1);
+ }
+ long dispatchTime = profileEvent.getRawLogTime() -
profileEvent.getRawLogTime() % DispatchManager.MINUTE_MS;
+ DispatchProfile dispatchProfile = new
DispatchProfile(profileEvent.getUid(),
+ profileEvent.getInlongGroupId(),
profileEvent.getInlongStreamId(),
+ dispatchTime);
+ dispatchProfile.addEvent(profileEvent,
DispatchManager.DEFAULT_DISPATCH_MAX_PACKCOUNT, Integer.MAX_VALUE);
+ context.backDispatchQueue(dispatchProfile);
+ }
+ if (requestItem.getDispatchProfile() != null) {
+ DispatchProfile dispatchProfile = requestItem.getDispatchProfile();
+ dispatchProfile.getEvents()
+ .forEach(v -> context.addSendResultMetric(v,
context.getTaskName(), false, sendTime));
+ context.backDispatchQueue(dispatchProfile);
}
- context.backDispatchQueue(requestItem);
}
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpChannelWorker.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpChannelWorker.java
index 673051cdab..ac42dbf3d6 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpChannelWorker.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpChannelWorker.java
@@ -18,14 +18,25 @@
package org.apache.inlong.sort.standalone.sink.http;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
-import org.apache.flume.Channel;
-import org.apache.flume.Event;
-import org.apache.flume.Transaction;
+import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.List;
+
public class HttpChannelWorker extends Thread {
public static final Logger LOG =
LoggerFactory.getLogger(HttpChannelWorker.class);
@@ -35,6 +46,7 @@ public class HttpChannelWorker extends Thread {
private LifecycleState status;
private IEvent2HttpRequestHandler handler;
+ private CloseableHttpAsyncClient httpClient;
public HttpChannelWorker(HttpSinkContext context, int workerIndex) {
this.context = context;
@@ -47,6 +59,7 @@ public class HttpChannelWorker extends Thread {
public void run() {
status = LifecycleState.START;
LOG.info("Starting HttpChannelWorker:{},status:{},index:{}",
context.getTaskName(), status, workerIndex);
+ this.initHttpClient();
while (status == LifecycleState.START) {
try {
this.doRun();
@@ -57,47 +70,91 @@ public class HttpChannelWorker extends Thread {
}
}
- public void doRun() {
- Channel channel = context.getChannel();
- Transaction tx = channel.getTransaction();
- tx.begin();
- try {
- Event event = channel.take();
- if (event == null) {
- tx.commit();
- Thread.sleep(context.getProcessInterval());
- return;
- }
- if (!(event instanceof ProfileEvent)) {
- tx.commit();
- this.context.addSendFailMetric();
- Thread.sleep(context.getProcessInterval());
- return;
- }
- // to profileEvent
- ProfileEvent profileEvent = (ProfileEvent) event;
- HttpRequest httpRequest = handler.parse(context, profileEvent);
- // offer queue
- if (httpRequest != null) {
- context.offerDispatchQueue(httpRequest);
- } else {
- context.addSendFailMetric();
+ public void doRun() throws InterruptedException, JsonProcessingException,
URISyntaxException {
+ DispatchProfile dispatchProfile = context.takeDispatchQueue();
+ if (dispatchProfile == null) {
+ Thread.sleep(context.getProcessInterval());
+ return;
+ }
+ // check id config
+ String uid = dispatchProfile.getUid();
+ if (context.getIdConfig(uid) == null) {
+ for (ProfileEvent profileEvent : dispatchProfile.getEvents()) {
+ context.addSendResultMetric(profileEvent,
context.getTaskName(), false, System.currentTimeMillis());
profileEvent.ack();
}
- tx.commit();
- } catch (Throwable t) {
- LOG.error("Process event failed!{}", this.getName(), t);
- try {
- tx.rollback();
- } catch (Throwable e) {
- LOG.error("Channel take transaction rollback exception:{}",
getName(), e);
+ return;
+ }
+ // send
+ try {
+ // parse request
+ List<HttpRequest> requests = handler.parse(context,
dispatchProfile);
+ // check request
+ if (requests == null) {
+ for (ProfileEvent profileEvent : dispatchProfile.getEvents()) {
+ context.addSendResultMetric(profileEvent,
context.getTaskName(), false, System.currentTimeMillis());
+ context.releaseDispatchQueue(dispatchProfile);
+ profileEvent.ack();
+ }
+ }
+ for (HttpRequest request : requests) {
+ httpClient.execute(request.getRequest(), new
HttpCallback(context, request));
+ for (ProfileEvent profileEvent : dispatchProfile.getEvents()) {
+ context.addSendMetric(profileEvent, context.getTaskName());
+ }
}
- } finally {
- tx.close();
+ } catch (Throwable e) {
+ LOG.error("Failed to send HttpRequest uid:{},error:{}",
dispatchProfile.getUid(), e.getMessage(), e);
+ context.backDispatchQueue(dispatchProfile);
+ this.initHttpClient();
+ Thread.sleep(context.getProcessInterval());
}
}
public void close() {
this.status = LifecycleState.STOP;
}
+
+ private void initHttpClient() {
+ if (httpClient != null) {
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ LOG.error(String.format("close HttpClient:%s",
e.getMessage()), e);
+ }
+ httpClient = null;
+ }
+ try {
+ if (httpClient == null) {
+ String userName = context.getUsername();
+ String password = context.getPassword();
+ LOG.info("initHttpAsyncClient:url:{}", context.getBaseUrl());
+
+ HttpAsyncClientBuilder builder = HttpAsyncClients.custom();
+ final CredentialsProvider provider = new
BasicCredentialsProvider();
+ if (context.getEnableCredential()) {
+ provider.setCredentials(AuthScope.ANY,
+ new UsernamePasswordCredentials(userName,
password));
+ builder.setDefaultCredentialsProvider(provider);
+ }
+
+ RequestConfig requestConfig = RequestConfig.custom()
+
.setConnectionRequestTimeout(context.getConnectionRequestTimeout())
+ .setSocketTimeout(context.getSocketTimeout())
+ .setMaxRedirects(context.getMaxRedirects())
+ .setConnectTimeout(120 * 1000)
+ .build();
+
+ builder.setDefaultRequestConfig(requestConfig)
+ .setMaxConnTotal(context.getMaxConnect())
+ .setMaxConnPerRoute(context.getMaxConnectPerRoute());
+
+ httpClient = HttpSinkFactory.createHttpAsyncClient(builder);
+ httpClient.start();
+ }
+ } catch (Exception e) {
+ LOG.error("init httpclient failed,error:{}", e.getMessage(), e);
+ httpClient = null;
+ }
+ }
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
index 602ef1a793..e209b511b2 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
@@ -50,6 +50,7 @@ public class HttpIdConfig extends IdConfig {
private List<String> fieldList;
private Charset sourceCharset;
private Charset sinkCharset;
+ private DataFlowConfig dataFlowConfig;
public static HttpIdConfig create(DataFlowConfig dataFlowConfig) {
HttpSinkConfig sinkConfig = (HttpSinkConfig)
dataFlowConfig.getSinkConfig();
@@ -83,6 +84,7 @@ public class HttpIdConfig extends IdConfig {
.fieldList(fields)
.sinkCharset(sinkCharset)
.sourceCharset(sourceCharset)
+ .dataFlowConfig(dataFlowConfig)
.build();
}
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpOutputChannel.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpOutputChannel.java
deleted file mode 100644
index db7e8da923..0000000000
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpOutputChannel.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.standalone.sink.http;
-
-import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
-
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
-import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
-import org.apache.http.impl.nio.client.HttpAsyncClients;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-
-public class HttpOutputChannel extends Thread {
-
- public static final Logger LOG =
InlongLoggerFactory.getLogger(HttpOutputChannel.class);
-
- private LifecycleState status;
- private HttpSinkContext context;
- private CloseableHttpAsyncClient httpClient;
-
- public HttpOutputChannel(HttpSinkContext context) {
- super(context.getTaskName());
- this.context = context;
- this.status = LifecycleState.IDLE;
- }
-
- public void init() {
- initHttpClient();
- }
-
- private boolean initHttpClient() {
- try {
- if (httpClient == null) {
- String userName = context.getUsername();
- String password = context.getPassword();
- LOG.info("initHttpAsyncClient:url:{}", context.getBaseUrl());
-
- HttpAsyncClientBuilder builder = HttpAsyncClients.custom();
- final CredentialsProvider provider = new
BasicCredentialsProvider();
- if (context.getEnableCredential()) {
- provider.setCredentials(AuthScope.ANY,
- new UsernamePasswordCredentials(userName,
password));
- builder.setDefaultCredentialsProvider(provider);
- }
-
- RequestConfig requestConfig = RequestConfig.custom()
-
.setConnectionRequestTimeout(context.getConnectionRequestTimeout())
- .setSocketTimeout(context.getSocketTimeout())
- .setMaxRedirects(context.getMaxRedirects())
- .setConnectTimeout(120 * 1000)
- .build();
-
- builder.setDefaultRequestConfig(requestConfig)
- .setMaxConnTotal(context.getMaxConnect())
- .setMaxConnPerRoute(context.getMaxConnectPerRoute());
-
- httpClient = HttpSinkFactory.createHttpAsyncClient(builder);
- httpClient.start();
- }
- } catch (Exception e) {
- LOG.error("init httpclient failed.", e);
- httpClient = null;
- return false;
- }
- return true;
- }
-
- public void close() {
- status = LifecycleState.STOP;
- try {
- httpClient.close();
- } catch (IOException e) {
- LOG.error(String.format("close HttpClient:%s", e.getMessage()), e);
- }
- }
-
- @Override
- public void run() {
- status = LifecycleState.START;
- LOG.info("Starting HttpOutputChannel:{},status:{}",
context.getTaskName(), status);
- while (status == LifecycleState.START) {
- try {
- send();
- } catch (Throwable t) {
- LOG.error("Error occurred while starting
HttpOutputChannel:{},status:{}", context.getTaskName(), status,
- t);
- }
- }
- }
-
- public void send() throws InterruptedException {
- HttpRequest httpRequest = null;
- try {
- // get httpRequest
- httpRequest = context.takeDispatchQueue();
- if (httpRequest == null) {
- Thread.sleep(context.getProcessInterval());
- return;
- }
- // get id config
- String uid = httpRequest.getEvent().getUid();
- if (context.getIdConfig(uid) == null) {
- context.addSendResultMetric(httpRequest.getEvent(),
context.getTaskName(), false,
- httpRequest.getSendTime());
- return;
- }
- // send
- httpClient.execute(httpRequest.getRequest(), new
HttpCallback(context, httpRequest));
- context.addSendMetric(httpRequest.getEvent(),
context.getTaskName());
- } catch (Throwable e) {
- LOG.error("Failed to send HttpRequest '{}': {}", httpRequest,
e.getMessage(), e);
- if (httpRequest != null) {
- context.backDispatchQueue(httpRequest);
- context.addSendResultMetric(httpRequest.getEvent(),
context.getTaskName(), false,
- httpRequest.getSendTime());
- }
- try {
- Thread.sleep(context.getProcessInterval());
- } catch (InterruptedException e1) {
- LOG.error("Thread interrupted while sleeping, error: {}",
e1.getMessage(), e1);
- }
- }
- }
-}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpRequest.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpRequest.java
index d1731a61b6..d5fd1b7179 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpRequest.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpRequest.java
@@ -18,19 +18,23 @@
package org.apache.inlong.sort.standalone.sink.http;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
import org.apache.http.client.methods.HttpUriRequest;
public class HttpRequest {
private final HttpUriRequest request;
- private final ProfileEvent event;
+ private final ProfileEvent profileEvent;
+ private final DispatchProfile dispatchProfile;
private final long sendTime;
private int remainRetryTimes;
- public HttpRequest(HttpUriRequest request, ProfileEvent event, int
remainRetryTimes) {
+ public HttpRequest(HttpUriRequest request, ProfileEvent profileEvent,
DispatchProfile dispatchProfile,
+ int remainRetryTimes) {
this.request = request;
- this.event = event;
+ this.profileEvent = profileEvent;
+ this.dispatchProfile = dispatchProfile;
this.sendTime = System.currentTimeMillis();
this.remainRetryTimes = remainRetryTimes;
}
@@ -39,10 +43,6 @@ public class HttpRequest {
return request;
}
- public ProfileEvent getEvent() {
- return event;
- }
-
public long getSendTime() {
return sendTime;
}
@@ -54,4 +54,21 @@ public class HttpRequest {
public void setRemainRetryTimes(int remainRetryTimes) {
this.remainRetryTimes = remainRetryTimes;
}
+
+ /**
+ * get profileEvent
+ * @return the profileEvent
+ */
+ public ProfileEvent getProfileEvent() {
+ return profileEvent;
+ }
+
+ /**
+ * get dispatchProfile
+ * @return the dispatchProfile
+ */
+ public DispatchProfile getDispatchProfile() {
+ return dispatchProfile;
+ }
+
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSink.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSink.java
index 9da6fa0bff..90ba642019 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSink.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSink.java
@@ -17,11 +17,18 @@
package org.apache.inlong.sort.standalone.sink.http;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.dispatch.DispatchManager;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
import org.apache.inlong.sort.standalone.sink.SinkContext;
import org.apache.inlong.sort.standalone.utils.BufferQueue;
+import org.apache.flume.Channel;
import org.apache.flume.Context;
+import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
@@ -29,24 +36,43 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
public class HttpSink extends AbstractSink implements Configurable {
public static final Logger LOG = LoggerFactory.getLogger(HttpSink.class);
private Context parentContext;
- private BufferQueue<HttpRequest> dispatchQueue;
+ // dispatch
+ private DispatchManager dispatchManager;
+ private BufferQueue<DispatchProfile> dispatchQueue;
+ private ScheduledExecutorService scheduledPool;
private HttpSinkContext context;
// workers
private List<HttpChannelWorker> workers = new ArrayList<>();
- // output
- private HttpOutputChannel outputChannel;
@Override
public void start() {
super.start();
try {
- this.dispatchQueue = SinkContext.createBufferQueue();
+ // dispatch
+ LinkedBlockingQueue<DispatchProfile> bufferQueue = new
LinkedBlockingQueue<>();
+ int maxBufferQueueSizeKb =
CommonPropertiesHolder.getInteger(SinkContext.KEY_MAX_BUFFERQUEUE_SIZE_KB,
+ SinkContext.DEFAULT_MAX_BUFFERQUEUE_SIZE_KB);
+ this.dispatchQueue = new BufferQueue<>(maxBufferQueueSizeKb,
bufferQueue);
+ this.dispatchManager = new DispatchManager(parentContext,
bufferQueue);
+ this.scheduledPool = Executors.newSingleThreadScheduledExecutor();
+ this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
+
+ public void run() {
+ dispatchManager.setNeedOutputOvertimeData();
+ }
+ }, this.dispatchManager.getDispatchTimeout(),
this.dispatchManager.getDispatchTimeout(),
+ TimeUnit.MILLISECONDS);
+ // send queue
this.context = new HttpSinkContext(getName(), parentContext,
getChannel(), dispatchQueue);
this.context.start();
for (int i = 0; i < context.getMaxThreads(); i++) {
@@ -54,9 +80,6 @@ public class HttpSink extends AbstractSink implements
Configurable {
this.workers.add(worker);
worker.start();
}
- this.outputChannel =
HttpSinkFactory.createHttpOutputChannel(context);
- this.outputChannel.init();
- this.outputChannel.start();
} catch (Exception e) {
LOG.error("Failed to start HttpSink '{}': {}", this.getName(),
e.getMessage());
}
@@ -71,7 +94,6 @@ public class HttpSink extends AbstractSink implements
Configurable {
worker.close();
}
this.workers.clear();
- this.outputChannel.close();
} catch (Exception e) {
LOG.error("Failed to stop HttpSink '{}': {}", this.getName(),
e.getMessage());
}
@@ -85,6 +107,46 @@ public class HttpSink extends AbstractSink implements
Configurable {
@Override
public Status process() throws EventDeliveryException {
- return Status.BACKOFF;
+ dispatchManager.outputOvertimeData();
+ Channel channel = getChannel();
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ try {
+ Event event = channel.take();
+ if (event == null) {
+ tx.commit();
+ return Status.BACKOFF;
+ }
+ if (event instanceof ProfileEvent) {
+ ProfileEvent profileEvent = (ProfileEvent) event;
+ this.dispatchQueue.acquire(profileEvent.getBody().length);
+ this.dispatchManager.addEvent(profileEvent);
+ tx.commit();
+ return Status.READY;
+ } else if (event instanceof DispatchProfile) {
+ DispatchProfile dispatchProfile = (DispatchProfile) event;
+ for (ProfileEvent profileEvent : dispatchProfile.getEvents()) {
+ this.dispatchQueue.acquire(profileEvent.getBody().length);
+ this.dispatchManager.addEvent(profileEvent);
+ }
+ tx.commit();
+ return Status.READY;
+ } else {
+ LOG.error("event is not ProfileEvent or
DispatchProfile,class:{}", event.getClass());
+ tx.commit();
+ this.context.addSendFailMetric();
+ return Status.READY;
+ }
+ } catch (Throwable t) {
+ LOG.error("Process event failed!" + this.getName(), t);
+ try {
+ tx.rollback();
+ } catch (Throwable e) {
+ LOG.error("Channel take transaction rollback exception:" +
getName(), e);
+ }
+ return Status.BACKOFF;
+ } finally {
+ tx.close();
+ }
}
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
index 9228cf3957..adc1d30089 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
@@ -26,6 +26,7 @@ import
org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
@@ -50,6 +51,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
+@SuppressWarnings("deprecation")
public class HttpSinkContext extends SinkContext {
public static final Logger LOG =
InlongLoggerFactory.getLogger(HttpSinkContext.class);
@@ -79,7 +81,7 @@ public class HttpSinkContext extends SinkContext {
private String nodeId;
private Map<String, HttpIdConfig> idConfigMap = new ConcurrentHashMap<>();
private ObjectMapper objectMapper = new ObjectMapper();
- private final BufferQueue<HttpRequest> dispatchQueue;
+ private final BufferQueue<DispatchProfile> dispatchQueue;
private AtomicLong offerCounter = new AtomicLong(0);
private AtomicLong takeCounter = new AtomicLong(0);
private AtomicLong backCounter = new AtomicLong(0);
@@ -96,7 +98,7 @@ public class HttpSinkContext extends SinkContext {
private int logMaxLength = DEFAULT_LOG_MAX_LENGTH;
public HttpSinkContext(String sinkName, Context context, Channel channel,
- BufferQueue<HttpRequest> dispatchQueue) {
+ BufferQueue<DispatchProfile> dispatchQueue) {
super(sinkName, context, channel);
this.sinkContext = context;
this.dispatchQueue = dispatchQueue;
@@ -190,8 +192,8 @@ public class HttpSinkContext extends SinkContext {
this.maxConnect = httpNodeConfig.getMaxConnect();
this.maxConnectPerRoute =
sinkContext.getInteger(KEY_MAX_CONNECT_PER_ROUTE,
DEFAULT_MAX_CONNECT_PER_ROUTE);
- this.connectionRequestTimeout =
- sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT,
DEFAULT_CONNECTION_REQUEST_TIMEOUT);
+ this.connectionRequestTimeout =
sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT,
+ DEFAULT_CONNECTION_REQUEST_TIMEOUT);
this.socketTimeout = sinkContext.getInteger(KEY_SOCKET_TIMEOUT,
DEFAULT_SOCKET_TIMEOUT);
this.maxRedirects = sinkContext.getInteger(KEY_MAX_REDIRECTS,
DEFAULT_MAX_REDIRECTS);
this.logMaxLength = sinkContext.getInteger(KEY_LOG_MAX_LENGTH,
DEFAULT_LOG_MAX_LENGTH);
@@ -206,8 +208,8 @@ public class HttpSinkContext extends SinkContext {
this.maxConnect = sinkContext.getInteger(KEY_MAX_CONNECT_TOTAL,
DEFAULT_MAX_CONNECT_TOTAL);
this.maxConnectPerRoute =
sinkContext.getInteger(KEY_MAX_CONNECT_PER_ROUTE,
DEFAULT_MAX_CONNECT_PER_ROUTE);
- this.connectionRequestTimeout =
- sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT,
DEFAULT_CONNECTION_REQUEST_TIMEOUT);
+ this.connectionRequestTimeout =
sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT,
+ DEFAULT_CONNECTION_REQUEST_TIMEOUT);
this.socketTimeout = sinkContext.getInteger(KEY_SOCKET_TIMEOUT,
DEFAULT_SOCKET_TIMEOUT);
this.maxRedirects = sinkContext.getInteger(KEY_MAX_REDIRECTS,
DEFAULT_MAX_REDIRECTS);
this.logMaxLength = sinkContext.getInteger(KEY_LOG_MAX_LENGTH,
DEFAULT_LOG_MAX_LENGTH);
@@ -239,7 +241,7 @@ public class HttpSinkContext extends SinkContext {
long auditFormatTime = msgTime - msgTime %
CommonPropertiesHolder.getAuditFormatInterval();
dimensions.put(SortMetricItem.KEY_MESSAGE_TIME,
String.valueOf(auditFormatTime));
SortMetricItem metricItem =
this.getMetricItemSet().findMetricItem(dimensions);
- metricItem.readFailCount.incrementAndGet();
+ metricItem.sendFailCount.incrementAndGet();
}
public void addSendResultMetric(ProfileEvent currentRecord, String bid,
boolean result, long sendTime) {
@@ -293,27 +295,25 @@ public class HttpSinkContext extends SinkContext {
this.sinkContext = sinkContext;
}
- public void offerDispatchQueue(HttpRequest httpRequest) {
- this.offerCounter.incrementAndGet();
- dispatchQueue.acquire(httpRequest.getEvent().getBody().length);
- dispatchQueue.offer(httpRequest);
- }
-
- public HttpRequest takeDispatchQueue() {
- HttpRequest httpRequest = this.dispatchQueue.pollRecord();
- if (httpRequest != null) {
+ public DispatchProfile takeDispatchQueue() {
+ DispatchProfile dispatchProfile = this.dispatchQueue.pollRecord();
+ if (dispatchProfile != null) {
this.takeCounter.incrementAndGet();
}
- return httpRequest;
+ return dispatchProfile;
}
- public void backDispatchQueue(HttpRequest httpRequest) {
+ public void backDispatchQueue(DispatchProfile dispatchProfile) {
this.backCounter.incrementAndGet();
- dispatchQueue.offer(httpRequest);
+ dispatchQueue.offer(dispatchProfile);
+ }
+
+ public void releaseDispatchQueue(ProfileEvent event) {
+ dispatchQueue.release(event.getBody().length);
}
- public void releaseDispatchQueue(HttpRequest httpRequest) {
- dispatchQueue.release(httpRequest.getEvent().getBody().length);
+ public void releaseDispatchQueue(DispatchProfile dispatchProfile) {
+ dispatchQueue.release(dispatchProfile.getSize());
}
public String getBaseUrl() {
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkFactory.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkFactory.java
index 6125297bae..a5a8d0f90f 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkFactory.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkFactory.java
@@ -22,10 +22,6 @@ import
org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
public class HttpSinkFactory {
- public static HttpOutputChannel createHttpOutputChannel(HttpSinkContext
context) {
- return new HttpOutputChannel(context);
- }
-
public static CloseableHttpAsyncClient
createHttpAsyncClient(HttpAsyncClientBuilder builder) {
return builder.build();
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
index 65502fb50e..0268c40821 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
@@ -18,12 +18,17 @@
package org.apache.inlong.sort.standalone.sink.http;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.net.URISyntaxException;
+import java.util.List;
public interface IEvent2HttpRequestHandler {
HttpRequest parse(HttpSinkContext context, ProfileEvent event) throws
URISyntaxException, JsonProcessingException;
+
+ List<HttpRequest> parse(HttpSinkContext context, DispatchProfile
dispatchProfile)
+ throws URISyntaxException, JsonProcessingException;
}