This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9d36b2338c [INLONG-11745][SDK] Clean up HttpProxySender and related
implementations (#11748)
9d36b2338c is described below
commit 9d36b2338c1b7b3f151582eb447d9b6af2abbfe7
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Feb 11 16:19:19 2025 +0800
[INLONG-11745][SDK] Clean up HttpProxySender and related implementations
(#11748)
* [INLONG-11745][SDK] Clean up HttpProxySender and related implementations
* [INLONG-11745][SDK] Clean up HttpProxySender and related implementations
---------
Co-authored-by: gosonzhang <[email protected]>
---
.../sdk/dataproxy/common/SendMessageCallback.java | 32 ---
.../sdk/dataproxy/http/InternalHttpSender.java | 255 ---------------------
.../inlong/sdk/dataproxy/network/HttpMessage.java | 76 ------
.../sdk/dataproxy/network/HttpProxySender.java | 229 ------------------
.../sdk/dataproxy/utils/ConcurrentHashSet.java | 45 ----
.../sdk/dataproxy/utils/ConsistencyHashUtil.java | 37 ---
.../inlong/sdk/dataproxy/utils/MapBackedSet.java | 74 ------
7 files changed, 748 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
deleted file mode 100644
index 48ce607037..0000000000
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.common;
-
-@Deprecated
-/**
- * Replace by MsgSendCallback
- *
- */
-public interface SendMessageCallback {
-
- /* Invoked when a message is confirmed by TDBus. */
- void onMessageAck(SendResult result);
-
- /* Invoked when a message transportation interrupted by an exception. */
- void onException(Throwable e);
-}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
deleted file mode 100644
index 382ba58ad8..0000000000
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.http;
-
-import org.apache.inlong.common.enums.DataProxyErrCode;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
-import org.apache.inlong.sdk.dataproxy.config.HostInfo;
-import org.apache.inlong.sdk.dataproxy.network.HttpMessage;
-import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
-import org.apache.inlong.sdk.dataproxy.utils.ConcurrentHashSet;
-import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHeaders;
-import org.apache.http.HttpStatus;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.utils.URLEncodedUtils;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.message.BasicNameValuePair;
-import org.apache.http.util.EntityUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-@Deprecated
-/**
- * Replace by InLongHttpMsgSender
- */
-public class InternalHttpSender {
-
- private static final Logger logger =
LoggerFactory.getLogger(InternalHttpSender.class);
-
- private final HttpMsgSenderConfig httpConfig;
- private final ConcurrentHashSet<HostInfo> hostList;
-
- private final LinkedBlockingQueue<HttpMessage> messageCache;
- private final ExecutorService workerServices =
Executors.newCachedThreadPool();
- private CloseableHttpClient httpClient;
- private boolean bShutDown = false;
-
- public InternalHttpSender(HttpMsgSenderConfig httpConfig,
- ConcurrentHashSet<HostInfo> hostList,
LinkedBlockingQueue<HttpMessage> messageCache) {
- this.httpConfig = httpConfig;
- this.hostList = hostList;
- this.messageCache = messageCache;
- submitWorkThread();
- }
-
- private void submitWorkThread() {
- for (int i = 0; i < httpConfig.getHttpAsyncRptWorkerNum(); i++) {
- workerServices.execute(new WorkerRunner());
- }
- }
-
- /**
- * construct header
- */
- private ArrayList<BasicNameValuePair> getHeaders(List<String> bodies,
- String groupId, String streamId, long dt) {
- ArrayList<BasicNameValuePair> params = new ArrayList<>();
- params.add(new BasicNameValuePair("groupId", groupId));
- params.add(new BasicNameValuePair("streamId", streamId));
- params.add(new BasicNameValuePair("dt", String.valueOf(dt)));
- params.add(new BasicNameValuePair("body", StringUtils.join(bodies,
"\n")));
- params.add(new BasicNameValuePair("cnt",
String.valueOf(bodies.size())));
-
- return params;
- }
-
- /**
- * http client
- */
- private synchronized CloseableHttpClient constructHttpClient(long timeout,
TimeUnit timeUnit) {
- if (httpClient != null) {
- return httpClient;
- }
- long timeoutInMs = timeUnit.toMillis(timeout);
- RequestConfig requestConfig = RequestConfig.custom()
- .setConnectTimeout((int) timeoutInMs)
- .setSocketTimeout((int) timeoutInMs).build();
- HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
- httpClientBuilder.setDefaultRequestConfig(requestConfig);
- return httpClientBuilder.build();
- }
-
- /**
- * check cache runner
- */
- private class WorkerRunner implements Runnable {
-
- @Override
- public void run() {
- // if not shutdown or queue is not empty
- while (!bShutDown || !messageCache.isEmpty()) {
- try {
- while (!messageCache.isEmpty()) {
- HttpMessage httpMessage = messageCache.poll();
- if (httpMessage != null) {
- SendResult result = sendMessageWithHostInfo(
- httpMessage.getBodies(),
httpMessage.getGroupId(),
- httpMessage.getStreamId(),
httpMessage.getDt(),
- httpMessage.getTimeout(),
httpMessage.getTimeUnit());
- httpMessage.getCallback().onMessageAck(result);
- }
- }
-
TimeUnit.MILLISECONDS.sleep(httpConfig.getHttpAsyncWorkerIdleWaitMs());
- } catch (Exception exception) {
- logger.error("exception caught", exception);
- }
- }
- }
- }
-
- /**
- * get random ip
- *
- * @return list of host info
- */
- public List<HostInfo> getRandomHostInfo() {
- List<HostInfo> tmpHostList = new ArrayList<>(hostList);
- Collections.shuffle(tmpHostList);
- // respect alive connection
- int maxIndex = Math.min(httpConfig.getAliveConnections(),
tmpHostList.size());
- return tmpHostList.subList(0, maxIndex);
- }
-
- /**
- * send request by http
- */
- private SendResult sendByHttp(List<String> bodies, String groupId, String
streamId, long dt,
- long timeout, TimeUnit timeUnit, HostInfo hostInfo) throws
Exception {
- HttpPost httpPost = null;
- CloseableHttpResponse response = null;
- try {
- if (httpClient == null) {
- httpClient = constructHttpClient(timeout, timeUnit);
- }
-
- String url = "http://" + hostInfo.getHostName() + ":" +
hostInfo.getPortNumber() + "/dataproxy/message";
- httpPost = new HttpPost(url);
- httpPost.setHeader(HttpHeaders.CONNECTION, "close");
- httpPost.setHeader(HttpHeaders.CONTENT_TYPE,
"application/x-www-form-urlencoded");
- ArrayList<BasicNameValuePair> contents = getHeaders(bodies,
groupId, streamId, dt);
- String encodedContents = URLEncodedUtils.format(contents,
StandardCharsets.UTF_8);
- httpPost.setEntity(new StringEntity(encodedContents));
-
- logger.info("begin to post request to {}, encoded content is: {}",
url, encodedContents);
- response = httpClient.execute(httpPost);
-
- String returnStr = EntityUtils.toString(response.getEntity());
- int returnCode = response.getStatusLine().getStatusCode();
- if (StringUtils.isBlank(returnStr) || HttpStatus.SC_OK !=
returnCode) {
- throw new Exception("get config from manager failed, result: "
+ returnStr + ", code: " + returnCode);
- }
-
- logger.debug("success to get config from manager, result str: " +
returnStr);
- JsonObject jsonResponse =
JsonParser.parseString(returnStr).getAsJsonObject();
- JsonElement codeElement = jsonResponse.get("code");
- if (codeElement != null) {
- if (DataProxyErrCode.SUCCESS.getErrCode() ==
codeElement.getAsInt()) {
- return SendResult.OK;
- } else {
- return SendResult.INVALID_DATA;
- }
- }
- } finally {
- if (httpPost != null) {
- httpPost.releaseConnection();
- }
- if (response != null) {
- response.close();
- }
- }
-
- return SendResult.UNKOWN_ERROR;
- }
-
- /**
- * send message with host info
- */
- public SendResult sendMessageWithHostInfo(List<String> bodies, String
groupId, String streamId, long dt,
- long timeout, TimeUnit timeUnit) {
-
- List<HostInfo> randomHostList = getRandomHostInfo();
- Exception tmpException = null;
- for (HostInfo hostInfo : randomHostList) {
- try {
- return sendByHttp(bodies, groupId, streamId, dt, timeout,
timeUnit, hostInfo);
- } catch (Exception exception) {
- tmpException = exception;
- logger.debug("error while sending data, resending it",
exception);
- }
- }
- if (tmpException != null) {
- logger.error("error while sending data", tmpException);
- }
- return SendResult.UNKOWN_ERROR;
- }
-
- /**
- * close resources
- */
- public void close() throws Exception {
- bShutDown = true;
- if (!messageCache.isEmpty()) {
- if (httpConfig.isDiscardHttpCacheWhenClosing()) {
- messageCache.clear();
- } else {
- long curTime = System.currentTimeMillis();
- while (!messageCache.isEmpty()
- || (System.currentTimeMillis() - curTime) <
httpConfig.getHttpCloseWaitPeriodMs()) {
- ProxyUtils.sleepSomeTime(100);
- }
- if (!messageCache.isEmpty()) {
- logger.warn("Close httpClient, remain {} messages",
messageCache.size());
- messageCache.clear();
- }
- }
- }
- if (httpClient != null) {
- httpClient.close();
- }
- workerServices.shutdown();
- }
-}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpMessage.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpMessage.java
deleted file mode 100644
index da3bbf45e2..0000000000
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpMessage.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.network;
-
-import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * http message for cache.
- */
-public class HttpMessage {
-
- private final String groupId;
- private final String streamId;
- private final List<String> bodies;
- private final SendMessageCallback callback;
- private final long dt;
- private final long timeout;
- private final TimeUnit timeUnit;
-
- public HttpMessage(List<String> bodies, String groupId, String streamId,
long dt,
- long timeout, TimeUnit timeUnit, SendMessageCallback callback) {
- this.groupId = groupId;
- this.streamId = streamId;
- this.bodies = bodies;
- this.callback = callback;
- this.dt = dt;
- this.timeout = timeout;
- this.timeUnit = timeUnit;
- }
-
- public String getGroupId() {
- return groupId;
- }
-
- public String getStreamId() {
- return streamId;
- }
-
- public List<String> getBodies() {
- return bodies;
- }
-
- public SendMessageCallback getCallback() {
- return callback;
- }
-
- public long getDt() {
- return dt;
- }
-
- public long getTimeout() {
- return timeout;
- }
-
- public TimeUnit getTimeUnit() {
- return timeUnit;
- }
-}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
deleted file mode 100644
index f4bcde915f..0000000000
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.network;
-
-import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
-import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
-import org.apache.inlong.sdk.dataproxy.config.HostInfo;
-import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
-import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
-import org.apache.inlong.sdk.dataproxy.http.InternalHttpSender;
-import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
-import org.apache.inlong.sdk.dataproxy.utils.ConcurrentHashSet;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-
-@Deprecated
-/**
- * http sender
- * Replace by InLongHttpMsgSender
- */
-public class HttpProxySender extends Thread {
-
- private static final Logger logger =
LoggerFactory.getLogger(HttpProxySender.class);
-
- private final ConcurrentHashSet<HostInfo> hostList = new
ConcurrentHashSet<>();
-
- private final HttpMsgSenderConfig proxyClientConfig;
- private ProxyConfigManager proxyConfigManager;
-
- private boolean bShutDown = false;
-
- private final InternalHttpSender internalHttpSender;
- private final LinkedBlockingQueue<HttpMessage> messageCache;
-
- public HttpProxySender(HttpMsgSenderConfig httpConfig) throws Exception {
- logger.info("Initial http sender, configure is {}", httpConfig);
- this.proxyClientConfig = httpConfig;
- initTDMClientAndRequest(httpConfig);
- this.messageCache = new
LinkedBlockingQueue<>(httpConfig.getHttpAsyncRptCacheSize());
- internalHttpSender = new InternalHttpSender(httpConfig, hostList,
messageCache);
- }
-
- /**
- * get proxy list
- *
- * @param httpConfig
- * @throws Exception
- */
- private void initTDMClientAndRequest(HttpMsgSenderConfig httpConfig)
throws Exception {
-
- try {
- proxyConfigManager = new ProxyConfigManager(httpConfig);
- ProxyConfigEntry proxyConfigEntry = retryGettingProxyConfig();
- hostList.addAll(proxyConfigEntry.getHostMap().values());
-
- this.setDaemon(true);
- this.start();
- } catch (Throwable e) {
- if (httpConfig.isOnlyUseLocalProxyConfig()) {
- throw new Exception("Get local proxy configure failure! e =
{}", e);
- } else {
- throw new Exception("Visit TDManager error! e = {}", e);
- }
- }
- logger.info("http proxy sender starts");
- }
-
- /**
- * retry fetching proxy config in case of network issue.
- *
- * @return proxy config entry.
- */
- private ProxyConfigEntry retryGettingProxyConfig() {
- ProcessResult procResult = new ProcessResult();
- if (proxyConfigManager.getGroupIdConfigure(true, procResult)) {
- return (ProxyConfigEntry) procResult.getRetData();
- }
- return null;
- }
-
- /**
- * get proxy list
- */
- @Override
- public void run() {
- ProcessResult procResult = new ProcessResult();
- while (!bShutDown) {
- try {
- int rand = ThreadLocalRandom.current().nextInt(0, 600);
- long randSleepTime = proxyClientConfig.getMgrMetaSyncInrMs() +
rand;
- TimeUnit.MILLISECONDS.sleep(randSleepTime);
- if (proxyConfigManager != null) {
- if (!proxyConfigManager.getGroupIdConfigure(false,
procResult)) {
- throw new Exception(procResult.toString());
- }
- ProxyConfigEntry configEntry = (ProxyConfigEntry)
procResult.getRetData();
- hostList.addAll(configEntry.getHostMap().values());
- hostList.retainAll(configEntry.getHostMap().values());
- } else {
- logger.error("manager is null, please check it!");
- }
- logger.info("get new proxy list " + hostList.toString());
- } catch (InterruptedException ignored) {
- // ignore it.
- } catch (Exception ex) {
- logger.error("managerFetcher get or save managerIpList occur
error,", ex);
- }
- }
- }
-
- /**
- * send by http
- *
- * @param body
- * @param groupId
- * @param streamId
- * @param dt
- * @param timeout
- * @param timeUnit
- * @return
- */
- public SendResult sendMessage(String body, String groupId, String
streamId, long dt,
- long timeout, TimeUnit timeUnit) {
- return sendMessage(Collections.singletonList(body), groupId, streamId,
dt, timeout, timeUnit);
- }
-
- /**
- * send multiple messages.
- *
- * @param bodies list of bodies
- * @param groupId
- * @param streamId
- * @param dt
- * @param timeout
- * @param timeUnit
- * @return
- */
- public SendResult sendMessage(List<String> bodies, String groupId, String
streamId, long dt,
- long timeout, TimeUnit timeUnit) {
- if (hostList.isEmpty()) {
- logger.error("proxy list is empty, maybe client has been "
- + "closed or groupId is not assigned with proxy list");
- return SendResult.NO_CONNECTION;
- }
- return internalHttpSender.sendMessageWithHostInfo(
- bodies, groupId, streamId, dt, timeout, timeUnit);
-
- }
-
- /**
- * async sender
- *
- * @param bodies
- * @param groupId
- * @param streamId
- * @param dt
- * @param timeout
- * @param timeUnit
- * @param callback
- */
- public void asyncSendMessage(List<String> bodies, String groupId, String
streamId, long dt,
- long timeout, TimeUnit timeUnit, SendMessageCallback callback) {
- List<String> bodyList = new ArrayList<>(bodies);
- HttpMessage httpMessage = new HttpMessage(bodyList, groupId, streamId,
dt,
- timeout, timeUnit, callback);
- try {
- if (!messageCache.offer(httpMessage)) {
- callback.onMessageAck(SendResult.ASYNC_CALLBACK_BUFFER_FULL);
- }
- } catch (Exception exception) {
- logger.error("error async sending data", exception);
- }
- }
-
- /**
- * async send single message.
- *
- * @param body
- * @param groupId
- * @param streamId
- * @param dt
- * @param timeout
- * @param timeUnit
- * @param callback
- */
- public void asyncSendMessage(String body, String groupId, String streamId,
long dt,
- long timeout, TimeUnit timeUnit, SendMessageCallback callback) {
- asyncSendMessage(Collections.singletonList(body), groupId, streamId,
- dt, timeout, timeUnit, callback);
- }
-
- /**
- * close
- */
- public void close() {
- hostList.clear();
- bShutDown = true;
- try {
- this.interrupt();
- internalHttpSender.close();
- } catch (Exception exception) {
- logger.error("error while closing http client", exception);
- }
- }
-}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConcurrentHashSet.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConcurrentHashSet.java
deleted file mode 100644
index 574450263a..0000000000
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConcurrentHashSet.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.utils;
-
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * A {@link ConcurrentHashMap}-backed {@link Set}.
- */
-public class ConcurrentHashSet<E> extends MapBackedSet<E> {
-
- private static final long serialVersionUID = 8518578988740277828L;
-
- public ConcurrentHashSet() {
- super(new ConcurrentHashMap<E, Boolean>());
- }
-
- public ConcurrentHashSet(Collection<E> c) {
- super(new ConcurrentHashMap<E, Boolean>(), c);
- }
-
- @Override
- public boolean add(E o) {
- Boolean answer = ((ConcurrentMap<E, Boolean>) map).putIfAbsent(o,
Boolean.TRUE);
- return answer == null;
- }
-}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConsistencyHashUtil.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConsistencyHashUtil.java
deleted file mode 100644
index d352aa6d89..0000000000
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConsistencyHashUtil.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.utils;
-
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Random;
-
-public class ConsistencyHashUtil {
-
- public static String hashMurMurHash(String key, int seed) {
- HashFunction hashFunction = Hashing.murmur3_128(seed);
- return hashFunction.hashString(key, StandardCharsets.UTF_8).toString();
- }
-
- public static String hashMurMurHash(String key) {
- Random random = new Random(System.currentTimeMillis());
- return hashMurMurHash(key, random.nextInt());
- }
-}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/MapBackedSet.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/MapBackedSet.java
deleted file mode 100644
index 2b1d7a31ef..0000000000
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/MapBackedSet.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.utils;
-
-import java.io.Serializable;
-import java.util.AbstractSet;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A {@link Map}-backed {@link Set}.
- */
-public class MapBackedSet<E> extends AbstractSet<E> implements Serializable {
-
- private static final long serialVersionUID = -8347878570391674042L;
-
- protected final Map<E, Boolean> map;
-
- public MapBackedSet(Map<E, Boolean> map) {
- this.map = map;
- }
-
- public MapBackedSet(Map<E, Boolean> map, Collection<E> c) {
- this.map = map;
- addAll(c);
- }
-
- @Override
- public int size() {
- return map.size();
- }
-
- @Override
- public boolean contains(Object o) {
- return map.containsKey(o);
- }
-
- @Override
- public Iterator<E> iterator() {
- return map.keySet().iterator();
- }
-
- @Override
- public boolean add(E o) {
- return map.put(o, Boolean.TRUE) == null;
- }
-
- @Override
- public boolean remove(Object o) {
- return map.remove(o) != null;
- }
-
- @Override
- public void clear() {
- map.clear();
- }
-}