This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 99dec0578b [INLONG-11469][SDK] Optimize the single message processing
logic in the EncodeObject class (#11470)
99dec0578b is described below
commit 99dec0578b1168d0eb61465cd6f473659f4f3ba1
Author: Goson Zhang <[email protected]>
AuthorDate: Fri Nov 8 14:53:25 2024 +0800
[INLONG-11469][SDK] Optimize the single message processing logic in the
EncodeObject class (#11470)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/common/msg/AttributeConstants.java | 1 +
.../inlong/sdk/dataproxy/DefaultMessageSender.java | 68 ++++-----
.../apache/inlong/sdk/dataproxy/LoadBalance.java | 6 +-
.../apache/inlong/sdk/dataproxy/MessageSender.java | 44 ++----
.../inlong/sdk/dataproxy/codec/EncodeObject.java | 71 +---------
.../sdk/dataproxy/codec/ProtocolDecoder.java | 9 +-
.../sdk/dataproxy/codec/ProtocolEncoder.java | 154 ++++++++-------------
.../sdk/dataproxy/config/EncryptConfigEntry.java | 3 +-
.../sdk/dataproxy/example/UdpClientExample.java | 46 +++---
.../inlong/sdk/dataproxy/network/ClientMgr.java | 3 +-
.../sdk/dataproxy/pb/PbProtocolMessageSender.java | 34 -----
.../sdk/dataproxy/threads/MetricWorkerThread.java | 5 +-
.../inlong/sdk/dataproxy/utils/LogCounter.java | 47 +++++++
13 files changed, 182 insertions(+), 309 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
index ceaeef3054..3402fc6455 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
@@ -21,6 +21,7 @@ public interface AttributeConstants {
String SEPARATOR = "&";
String KEY_VALUE_SEPARATOR = "=";
+ String LINE_FEED_SEP = "\n";
/**
* group id
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 623e84d8c0..9e2c8c06b5 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -36,6 +36,7 @@ import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -229,15 +230,6 @@ public class DefaultMessageSender implements MessageSender
{
return sendIndexResult;
}
- @Deprecated
- public SendResult sendMessage(byte[] body, String attributes, String
msgUUID,
- long timeout, TimeUnit timeUnit) {
- Function<Sender, SendResult> sendOperation =
- (sender) -> sender.syncSendMessage(new EncodeObject(body,
attributes, idGenerator.getNextId()), msgUUID,
- timeout, timeUnit);
- return attemptSendMessage(sendOperation);
- }
-
public SendResult sendMessage(byte[] body, String groupId, String
streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit) {
return sendMessage(body, groupId, streamId, dt, msgUUID, timeout,
timeUnit, false);
@@ -275,8 +267,9 @@ public class DefaultMessageSender implements MessageSender {
boolean isCompressEnd = (isCompress && (body.length > cpsSize));
if (msgtype == 7 || msgtype == 8) {
- EncodeObject encodeObject = new EncodeObject(body, msgtype,
isCompressEnd, isReport,
- isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
groupId, streamId, proxySend);
+ EncodeObject encodeObject =
+ new EncodeObject(Collections.singletonList(body), msgtype,
isCompressEnd, isReport,
+ isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(encodeObject, msgUUID,
timeout, timeUnit);
@@ -289,13 +282,13 @@ public class DefaultMessageSender implements
MessageSender {
final long finalDt = dt;
Function<Sender, SendResult> sendOperation;
if (isCompressEnd) {
- sendOperation = (sender) -> sender.syncSendMessage(new
EncodeObject(body,
+ sendOperation = (sender) -> sender.syncSendMessage(new
EncodeObject(Collections.singletonList(body),
"groupId=" + groupId + "&streamId=" + streamId +
"&dt=" + finalDt + "&cp=snappy"
+ finalProxySend,
idGenerator.getNextId(), this.getMsgtype(),
true, groupId), msgUUID, timeout, timeUnit);
} else {
- sendOperation = (sender) -> sender.syncSendMessage(new
EncodeObject(body,
+ sendOperation = (sender) -> sender.syncSendMessage(new
EncodeObject(Collections.singletonList(body),
"groupId=" + groupId + "&streamId=" + streamId +
"&dt=" + finalDt
+ finalProxySend,
idGenerator.getNextId(), this.getMsgtype(),
@@ -347,9 +340,10 @@ public class DefaultMessageSender implements MessageSender
{
boolean isCompressEnd = (isCompress && (body.length > cpsSize));
if (msgtype == 7 || msgtype == 8) {
- EncodeObject encodeObject = new EncodeObject(body, msgtype,
isCompressEnd, isReport,
- isGroupIdTransfer, dt / 1000,
- idGenerator.getNextInt(), groupId, streamId,
attrs.toString());
+ EncodeObject encodeObject =
+ new EncodeObject(Collections.singletonList(body), msgtype,
isCompressEnd, isReport,
+ isGroupIdTransfer, dt / 1000,
+ idGenerator.getNextInt(), groupId, streamId,
attrs.toString());
encodeObject.setSupportLF(isSupportLF);
Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(encodeObject, msgUUID,
timeout, timeUnit);
@@ -358,13 +352,15 @@ public class DefaultMessageSender implements
MessageSender {
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt);
if (isCompressEnd) {
attrs.append("&cp=snappy");
- Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(new EncodeObject(body,
- attrs.toString(), idGenerator.getNextId(),
this.getMsgtype(), true, groupId),
+ Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(
+ new EncodeObject(Collections.singletonList(body),
+ attrs.toString(), idGenerator.getNextId(),
this.getMsgtype(), true, groupId),
msgUUID, timeout, timeUnit);
return attemptSendMessage(sendOperation);
} else {
- Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(new EncodeObject(body,
- attrs.toString(), idGenerator.getNextId(),
this.getMsgtype(), false, groupId),
+ Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(
+ new EncodeObject(Collections.singletonList(body),
+ attrs.toString(), idGenerator.getNextId(),
this.getMsgtype(), false, groupId),
msgUUID, timeout, timeUnit);
return attemptSendMessage(sendOperation);
}
@@ -503,13 +499,6 @@ public class DefaultMessageSender implements MessageSender
{
return null;
}
- @Deprecated
- public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String attributes,
- String msgUUID, long timeout, TimeUnit timeUnit) throws
ProxysdkException {
- sender.asyncSendMessage(new EncodeObject(body, attributes,
idGenerator.getNextId()),
- callback, msgUUID, timeout, timeUnit);
- }
-
public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId, long dt,
String msgUUID, long timeout, TimeUnit timeUnit) throws
ProxysdkException {
asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID,
timeout, timeUnit, false);
@@ -546,9 +535,10 @@ public class DefaultMessageSender implements MessageSender
{
}
boolean isCompressEnd = (isCompress && (body.length > cpsSize));
if (msgtype == 7 || msgtype == 8) {
- EncodeObject encodeObject = new EncodeObject(body,
this.getMsgtype(), isCompressEnd, isReport,
- isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
- groupId, streamId, proxySend);
+ EncodeObject encodeObject =
+ new EncodeObject(Collections.singletonList(body),
this.getMsgtype(), isCompressEnd, isReport,
+ isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(),
+ groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout,
timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
@@ -556,13 +546,13 @@ public class DefaultMessageSender implements
MessageSender {
if (isProxySend) {
proxySend = "&" + proxySend;
}
- sender.asyncSendMessage(new EncodeObject(body, "groupId="
+ sender.asyncSendMessage(new
EncodeObject(Collections.singletonList(body), "groupId="
+ groupId + "&streamId=" + streamId + "&dt=" + dt +
"&cp=snappy" + proxySend,
idGenerator.getNextId(), this.getMsgtype(), true,
groupId),
callback, msgUUID, timeout, timeUnit);
} else {
sender.asyncSendMessage(
- new EncodeObject(body, "groupId=" + groupId +
"&streamId="
+ new EncodeObject(Collections.singletonList(body),
"groupId=" + groupId + "&streamId="
+ streamId + "&dt=" + dt + proxySend,
idGenerator.getNextId(),
this.getMsgtype(), false, groupId),
callback,
@@ -611,21 +601,23 @@ public class DefaultMessageSender implements
MessageSender {
boolean isCompressEnd = (isCompress && (body.length > cpsSize));
if (msgtype == 7 || msgtype == 8) {
- EncodeObject encodeObject = new EncodeObject(body,
this.getMsgtype(), isCompressEnd,
- isReport, isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(),
- groupId, streamId, attrs.toString());
+ EncodeObject encodeObject =
+ new EncodeObject(Collections.singletonList(body),
this.getMsgtype(), isCompressEnd,
+ isReport, isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(),
+ groupId, streamId, attrs.toString());
encodeObject.setSupportLF(isSupportLF);
sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout,
timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt);
if (isCompressEnd) {
attrs.append("&cp=snappy");
- sender.asyncSendMessage(new EncodeObject(body,
attrs.toString(),
+ sender.asyncSendMessage(new
EncodeObject(Collections.singletonList(body), attrs.toString(),
idGenerator.getNextId(), this.getMsgtype(), true,
groupId),
callback, msgUUID, timeout, timeUnit);
} else {
- sender.asyncSendMessage(new EncodeObject(body,
attrs.toString(), idGenerator.getNextId(),
- this.getMsgtype(), false, groupId),
+ sender.asyncSendMessage(
+ new EncodeObject(Collections.singletonList(body),
attrs.toString(), idGenerator.getNextId(),
+ this.getMsgtype(), false, groupId),
callback, msgUUID, timeout, timeUnit);
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/LoadBalance.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/LoadBalance.java
index 9ee1726c52..e71838ae76 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/LoadBalance.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/LoadBalance.java
@@ -25,10 +25,10 @@ public enum LoadBalance {
WEIGHT_RANDOM("weight random", 3),
WEIGHT_ROBIN("weight robin", 4);
- private String name;
- private int index;
+ private final String name;
+ private final int index;
- private LoadBalance(String name, int index) {
+ LoadBalance(String name, int index) {
this.name = name;
this.index = index;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
index 155031bee6..1b18096229 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
@@ -27,25 +27,13 @@ import java.util.concurrent.TimeUnit;
public interface MessageSender {
- /**
- * This method provides a synchronized function which you want to send data
- * with extra attributes except groupId,streamId,dt,etc
- * This method is deprecated,we suggest you don't use it.
- *
- * @param body The data will be sent
- * @param attributes The attributes you want to add
- */
- @Deprecated
- public SendResult sendMessage(byte[] body, String attributes, String
msgUUID,
- long timeout, TimeUnit timeUnit);
-
/**
* This method provides a synchronized function which you want to send
data without packing
*
* @param body The data will be sent
*
*/
- public SendResult sendMessage(byte[] body, String groupId, String
streamId, long dt, String msgUUID,
+ SendResult sendMessage(byte[] body, String groupId, String streamId, long
dt, String msgUUID,
long timeout, TimeUnit timeUnit);
/**
@@ -57,7 +45,7 @@ public interface MessageSender {
* @param extraAttrMap The attributes you want to add,
* and each element of extraAttrMap contains a pair
like attrKey,attrValue
*/
- public SendResult sendMessage(byte[] body, String groupId, String
streamId, long dt, String msgUUID,
+ SendResult sendMessage(byte[] body, String groupId, String streamId, long
dt, String msgUUID,
long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap);
/**
@@ -66,7 +54,7 @@ public interface MessageSender {
*
* @param bodyList The data will be sent,which is a collection consisting
of byte arrays
*/
- public SendResult sendMessage(List<byte[]> bodyList, String groupId,
String streamId, long dt, String msgUUID,
+ SendResult sendMessage(List<byte[]> bodyList, String groupId, String
streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit);
/**
@@ -78,23 +66,9 @@ public interface MessageSender {
* @param extraAttrMap The attributes you want to add,
* and each element of extraAttrMap contains a pair
like attrKey,attrValue
*/
- public SendResult sendMessage(List<byte[]> bodyList, String groupId,
String streamId, long dt, String msgUUID,
+ SendResult sendMessage(List<byte[]> bodyList, String groupId, String
streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap);
- /**
- * This method provides an asynchronized function which you want to send
data
- * with extra attributes except groupId,streamId,dt,etc
- * This method is deprecated,we suggest you don't use it.
- *
- *
- * @param body The data will be sent
- * @param attributes The attributes you want to add
- */
- @Deprecated
- public void asyncSendMessage(SendMessageCallback callback,
- byte[] body, String attributes, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException;
-
/**
* This method provides a synchronized function which you want to send
data without packing
* with extra attributes except groupId,streamId,dt,etc
@@ -104,7 +78,7 @@ public interface MessageSender {
* @param extraAttrMap The attributes you want to add,
* and each element of extraAttrMap contains a pair
like attrKey,attrValue
*/
- public void asyncSendMessage(SendMessageCallback callback,
+ void asyncSendMessage(SendMessageCallback callback,
byte[] body, String groupId, String streamId, long dt, String
msgUUID,
long timeout, TimeUnit timeUnit,
Map<String, String> extraAttrMap) throws ProxysdkException;
@@ -116,7 +90,7 @@ public interface MessageSender {
* @param callback The implementation of callback function
* @param body The data will be sent
*/
- public void asyncSendMessage(SendMessageCallback callback,
+ void asyncSendMessage(SendMessageCallback callback,
byte[] body, String groupId, String streamId, long dt, String
msgUUID,
long timeout, TimeUnit timeUnit) throws ProxysdkException;
@@ -126,7 +100,7 @@ public interface MessageSender {
*
* @param bodyList The data will be sent,which is a collection consisting
of byte arrays
*/
- public void asyncSendMessage(SendMessageCallback callback,
+ void asyncSendMessage(SendMessageCallback callback,
List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID,
long timeout, TimeUnit timeUnit) throws ProxysdkException;
@@ -139,7 +113,7 @@ public interface MessageSender {
* @param extraAttrMap The attributes you want to add, and each
* element of extraAttrMap contains a pair like
attrKey,attrValue
*/
- public void asyncSendMessage(SendMessageCallback callback,
+ void asyncSendMessage(SendMessageCallback callback,
List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID,
long timeout, TimeUnit timeUnit,
Map<String, String> extraAttrMap) throws ProxysdkException;
@@ -170,5 +144,5 @@ public interface MessageSender {
void asyncSendMessage(String inlongGroupId, String inlongStreamId,
List<byte[]> bodyList,
SendMessageCallback callback) throws ProxysdkException;
- public void close();
+ void close();
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
index e875d01e77..a89fef4fe9 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
@@ -35,7 +35,6 @@ public class EncodeObject {
private static final Splitter.MapSplitter MAP_SPLITTER =
Splitter.on(AttributeConstants.SEPARATOR).trimResults()
.withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
- private byte[] bodyBytes;
private String attributes;
private String messageId;
private int msgtype;
@@ -74,33 +73,12 @@ public class EncodeObject {
}
/* Used by de_serialization. */
- public EncodeObject(byte[] bodyBytes, String attributes) {
- this.bodyBytes = bodyBytes;
+ public EncodeObject(List<byte[]> bodyList, String attributes) {
+ this.bodylist = bodyList;
this.attributes = attributes;
handleAttr(attributes);
}
- /* Used by serialization.But never used */
- // old version:we need add message id by attr
- public EncodeObject(byte[] bodyBytes, String attributes, String messageId)
{
- this.bodyBytes = bodyBytes;
- this.messageId = messageId;
- this.attributes = attributes + "&messageId=" + messageId;
- addRTMS(MsgType.MSG_COMMON_SERVICE.getValue());
- }
-
- // used for bytes initializtion,msgtype=3/5
- public EncodeObject(byte[] bodyBytes, String attributes, String messageId,
- int msgtype, boolean isCompress, final String groupId) {
- this.bodyBytes = bodyBytes;
- this.messageId = messageId;
- this.attributes = attributes + "&messageId=" + messageId;
- this.msgtype = msgtype;
- this.groupId = groupId;
- this.isCompress = isCompress;
- addRTMS(msgtype);
- }
-
// used for bodylist initializtion,msgtype=3/5
public EncodeObject(List<byte[]> bodyList, String attributes, String
messageId,
int msgtype, boolean isCompress, final String groupId) {
@@ -113,23 +91,6 @@ public class EncodeObject {
addRTMS(msgtype);
}
- // used for bytes initializtion,msgtype=7/8
- public EncodeObject(byte[] bodyBytes, int msgtype, boolean isCompress,
boolean isReport,
- boolean isGroupIdTransfer, long dt, long seqId, String groupId,
- String streamId, String commonattr) {
- this.bodyBytes = bodyBytes;
- this.msgtype = msgtype;
- this.isCompress = isCompress;
- this.isReport = isReport;
- this.dt = dt;
- this.isGroupIdTransfer = isGroupIdTransfer;
- this.commonattr = commonattr;
- this.messageId = String.valueOf(seqId);
- this.groupId = groupId;
- this.streamId = streamId;
- addRTMS(msgtype);
- }
-
// used for bodylist initializtion,msgtype=7/8
public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress,
boolean isReport, boolean isGroupIdTransfer, long dt,
@@ -147,26 +108,6 @@ public class EncodeObject {
addRTMS(msgtype);
}
- // file agent, used for bytes initializtion,msgtype=7/8
- public EncodeObject(byte[] bodyBytes, int msgtype, boolean isCompress,
- boolean isReport, boolean isGroupIdTransfer, long dt,
- long seqId, String groupId, String streamId, String commonattr,
- String messageKey, String proxyIp) {
- this.bodyBytes = bodyBytes;
- this.msgtype = msgtype;
- this.isCompress = isCompress;
- this.isReport = isReport;
- this.dt = dt;
- this.isGroupIdTransfer = isGroupIdTransfer;
- this.commonattr = commonattr;
- this.messageId = String.valueOf(seqId);
- this.groupId = groupId;
- this.streamId = streamId;
- this.messageKey = messageKey;
- this.proxyIp = proxyIp;
- addRTMS(msgtype);
- }
-
// file agent, used for bodylist initializtion,msgtype=7/8
public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress,
boolean isReport, boolean isGroupIdTransfer, long dt,
@@ -395,14 +336,6 @@ public class EncodeObject {
this.msgtype = msgtype;
}
- public byte[] getBodyBytes() {
- return bodyBytes;
- }
-
- public void setBodyBytes(byte[] bodyBytes) {
- this.bodyBytes = bodyBytes;
- }
-
public String getAttributes() {
return attributes;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
index 63476e7a50..2038a8b8d6 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
+import java.util.Collections;
import java.util.List;
public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
@@ -71,7 +72,13 @@ public class ProtocolDecoder extends
MessageToMessageDecoder<ByteBuf> {
attrBytes = new byte[attrLength];
buffer.readBytes(attrBytes);
}
- EncodeObject object = new EncodeObject(bodyBytes, new
String(attrBytes, StandardCharsets.UTF_8));
+ EncodeObject object;
+ if (bodyBytes == null) {
+ object = new EncodeObject(new String(attrBytes,
StandardCharsets.UTF_8));
+ } else {
+ object = new EncodeObject(Collections.singletonList(bodyBytes),
+ new String(attrBytes, StandardCharsets.UTF_8));
+ }
object.setMsgtype(5);
out.add(object);
} else if (msgType == 7) {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
index fe6dec9733..ecc1e1de91 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
@@ -17,10 +17,12 @@
package org.apache.inlong.sdk.dataproxy.codec;
+import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.EncryptInfo;
import org.apache.inlong.sdk.dataproxy.network.Utils;
import org.apache.inlong.sdk.dataproxy.utils.EncryptUtil;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -31,11 +33,10 @@ import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;
import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
-import java.util.Iterator;
import java.util.List;
import static org.apache.inlong.sdk.dataproxy.ConfigConstants.FLAG_ALLOW_AUTH;
@@ -44,30 +45,26 @@ import static
org.apache.inlong.sdk.dataproxy.ConfigConstants.FLAG_ALLOW_ENCRYPT
public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> {
- private static final Logger logger = LoggerFactory
- .getLogger(ProtocolEncoder.class);
+ private static final Logger logger =
LoggerFactory.getLogger(ProtocolEncoder.class);
+ private static final LogCounter exptCounter = new LogCounter(10, 100000,
60 * 1000L);
protected void encode(ChannelHandlerContext ctx,
EncodeObject message, List<Object> out) throws Exception {
ByteBuf buf = null;
try {
- EncodeObject object = message;
- if (object.getMsgtype() == 3) {
- buf = writeToBuf3(object);
+ if (message.getMsgtype() == 3) {
+ buf = writeToBuf3(message);
+ } else if (message.getMsgtype() == 5) {
+ buf = writeToBuf5(message);
+ } else if (message.getMsgtype() == 7) {
+ buf = writeToBuf7(message);
+ } else if (message.getMsgtype() == 8) {
+ buf = writeToBuf8(message);
}
- if (object.getMsgtype() == 5) {
- buf = writeToBuf5(object);
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.error("ProtocolEncoder encode message failure", ex);
}
-
- if (object.getMsgtype() == 7) {
- buf = writeToBuf7(object);
- }
- if (object.getMsgtype() == 8) {
- buf = writeToBuf8(object);
- }
- } catch (Exception e) {
- logger.error("{}", e.getMessage());
- e.printStackTrace();
}
if (buf != null) {
out.add(buf);
@@ -113,8 +110,10 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
buf.writeBytes(endAttr.getBytes("utf8"));
}
buf.writeShort(0xee01);
- } catch (Exception e) {
- logger.error(e.getMessage());
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.error("Write type8 data exception", ex);
+ }
}
return buf;
}
@@ -176,7 +175,7 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
buf.writeInt((int) object.getDt());
buf.writeShort(cnt);
- buf.writeInt(Integer.valueOf(object.getMessageId()));
+ buf.writeInt(Integer.parseInt(object.getMessageId()));
buf.writeInt(body.length);
buf.writeBytes(body);
@@ -195,53 +194,41 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
byte[] body = null;
int cnt = 1;
- if (object.getBodylist() != null && object.getBodylist().size() !=
0) {
+ if (object.getBodylist() != null &&
!object.getBodylist().isEmpty()) {
if (object.getCnt() > 0) {
cnt = object.getCnt();
} else {
cnt = object.getBodylist().size();
}
-
ByteArrayOutputStream out = new ByteArrayOutputStream();
- Iterator<byte[]> iter = object.getBodylist().iterator();
-
if (object.isSupportLF()) {
+ int totalCnt = 0;
ByteArrayOutputStream data = new ByteArrayOutputStream();
- int len = object.getBodylist().size();
- for (int i = 0; i < len - 1; i++) {
- data.write(object.getBodylist().get(i));
- data.write("\n".getBytes("utf8"));
+ for (byte[] entry : object.getBodylist()) {
+ if (totalCnt++ > 0) {
+ data.write("\n".getBytes("utf8"));
+ }
+ data.write(entry);
}
- data.write(object.getBodylist().get(len - 1));
- ByteBuffer databuffer = ByteBuffer.allocate(4);
- databuffer.putInt(data.toByteArray().length);
- out.write(databuffer.array());
+ ByteBuffer dataBuffer = ByteBuffer.allocate(4);
+ dataBuffer.putInt(data.toByteArray().length);
+ out.write(dataBuffer.array());
out.write(data.toByteArray());
} else {
- while (iter.hasNext()) {
- byte[] entry = iter.next();
- ByteBuffer databuffer = ByteBuffer.allocate(4);
- databuffer.putInt(entry.length);
- out.write(databuffer.array());
+ for (byte[] entry : object.getBodylist()) {
+ ByteBuffer dataBuffer = ByteBuffer.allocate(4);
+ dataBuffer.putInt(entry.length);
+ out.write(dataBuffer.array());
out.write(entry);
}
}
body = out.toByteArray();
}
- // send single message one time
- if (object.getBodyBytes() != null && object.getBodyBytes().length
!= 0) {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
-
- ByteBuffer databuffer = ByteBuffer.allocate(4);
- databuffer.putInt(object.getBodyBytes().length);
- out.write(databuffer.array());
- out.write(object.getBodyBytes());
- body = out.toByteArray();
- }
-
buf = constructBody(body, object, totalLength, cnt);
- } catch (Exception e) {
- logger.error("writeToBuf7 has {}", e);
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.error("Write type7 data exception", ex);
+ }
}
return buf;
}
@@ -253,11 +240,9 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
byte[] body = null;
// send multiple messages one time
- if (object.getBodylist() != null && object.getBodylist().size() !=
0) {
+ if (object.getBodylist() != null &&
!object.getBodylist().isEmpty()) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
- Iterator<byte[]> iter = object.getBodylist().iterator();
- while (iter.hasNext()) {
- byte[] entry = iter.next();
+ for (byte[] entry : object.getBodylist()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(4);
byteBuffer.putInt(entry.length);
out.write(byteBuffer.array());
@@ -265,15 +250,6 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
}
body = out.toByteArray();
}
- // send single message one time
- if (object.getBodyBytes() != null && object.getBodyBytes().length
!= 0) {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteBuffer byteBuffer = ByteBuffer.allocate(4);
- byteBuffer.putInt(object.getBodyBytes().length);
- out.write(byteBuffer.array());
- out.write(object.getBodyBytes());
- body = out.toByteArray();
- }
if (body != null) {
String msgAttrs = object.getAttributes();
if (object.isCompress()) {
@@ -312,25 +288,14 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
buf.writeInt(msgAttrs.getBytes("utf8").length);
buf.writeBytes(msgAttrs.getBytes("utf8"));
}
- } catch (Exception e) {
- logger.error("{}", e.getMessage());
- e.printStackTrace();
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.error("Write type5 data exception", ex);
+ }
}
return buf;
}
- /*
- * private ChannelBuffer writeToBuf4(EncodeObject object) { ChannelBuffer
buf = ChannelBuffers.dynamicBuffer(); try
- * { int totalLength = 1 + 4 + 4; byte[] body = null;
- *
- * //send single message one time if (object.getBodyBytes() != null &&
object.getBodyBytes().length != 0) { body =
- * object.getBodyBytes(); } totalLength = totalLength + body.length +
- * object.getAttributes().getBytes("utf8").length;
buf.writeInt(totalLength); buf.writeByte(4);
- * buf.writeInt(body.length); buf.writeBytes(body);
buf.writeInt(object.getAttributes().getBytes().length);
- * buf.writeBytes(object.getAttributes().getBytes()); } catch (Exception
e) { logger.error(e.getMessage()); } return
- * buf; }
- */
-
private ByteBuf writeToBuf3(EncodeObject object) {
ByteBuf buf = null;
try {
@@ -338,20 +303,17 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
byte[] body = null;
// send multiple messages one time
- if (object.getBodylist() != null && object.getBodylist().size() !=
0) {
+ if (object.getBodylist() != null &&
!object.getBodylist().isEmpty()) {
+ int totalCnt = 0;
ByteArrayOutputStream out = new ByteArrayOutputStream();
- Iterator<byte[]> iter = object.getBodylist().iterator();
- while (iter.hasNext()) {
- byte[] entry = iter.next();
+ for (byte[] entry : object.getBodylist()) {
+ if (totalCnt++ > 0) {
+
out.write(AttributeConstants.LINE_FEED_SEP.getBytes(StandardCharsets.UTF_8));
+ }
out.write(entry);
- out.write("\n".getBytes("utf8"));
}
body = out.toByteArray();
}
- // send single message one time
- if (object.getBodyBytes() != null && object.getBodyBytes().length
!= 0) {
- body = object.getBodyBytes();
- }
if (body != null) {
String msgAttrs = object.getAttributes();
if (object.isCompress()) {
@@ -390,9 +352,10 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
buf.writeInt(msgAttrs.getBytes("utf8").length);
buf.writeBytes(msgAttrs.getBytes("utf8"));
}
- } catch (Exception e) {
- logger.error("{}", e.getMessage());
- e.printStackTrace();
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.error("Write type3 data exception", ex);
+ }
}
return buf;
}
@@ -407,9 +370,10 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
tmpData, 0);
body = new byte[len];
System.arraycopy(tmpData, 0, body, 0, len);
- } catch (IOException e) {
- logger.error("{}", e.getMessage());
- e.printStackTrace();
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.error("Compress data exception", ex);
+ }
}
return body;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java
index f27506fe22..6acfe09d8a 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java
@@ -35,7 +35,7 @@ public class EncryptConfigEntry implements
java.io.Serializable {
private String pubKey;
private byte[] aesKey;
private String rsaEncryptedKey;
- private AtomicLong lastUpdateTime = new AtomicLong(0);
+ private final AtomicLong lastUpdateTime = new AtomicLong(0);
public EncryptConfigEntry(final String userName, final String version,
final String pubKey) {
this.userName = userName;
@@ -43,7 +43,6 @@ public class EncryptConfigEntry implements
java.io.Serializable {
this.pubKey = pubKey;
this.aesKey = null;
this.rsaEncryptedKey = null;
- // this.rsaKey = EncryptUtil.loadPublicKeyByText(pubKey);
}
public String getVersion() {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
index 26e490fc37..863f197353 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
@@ -45,7 +45,7 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
-import java.util.Iterator;
+import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static
org.apache.inlong.sdk.dataproxy.ConfigConstants.FLAG_ALLOW_COMPRESS;
@@ -116,9 +116,8 @@ public class UdpClientExample {
boolean isGroupIdTransfer, long dt, long seqId, String groupId,
String streamId,
String attr) throws UnsupportedEncodingException {
EncodeObject encodeObject =
- new EncodeObject(getRandomString(5).getBytes("UTF-8"), msgType,
- isCompress,
- isReport, isGroupIdTransfer, dt, seqId, groupId,
streamId, attr);
+ new
EncodeObject(Collections.singletonList(getRandomString(5).getBytes("UTF-8")),
+ msgType, isCompress, isReport, isGroupIdTransfer, dt,
seqId, groupId, streamId, attr);
return encodeObject;
}
@@ -142,48 +141,37 @@ public class UdpClientExample {
byte[] body = null;
int cnt = 1;
- if (object.getBodylist() != null && object.getBodylist().size() !=
0) {
+ if (object.getBodylist() != null &&
!object.getBodylist().isEmpty()) {
if (object.getCnt() > 0) {
cnt = object.getCnt();
} else {
cnt = object.getBodylist().size();
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
- Iterator<byte[]> iter = object.getBodylist().iterator();
if (object.isSupportLF()) {
+ int totalCnt = 0;
ByteArrayOutputStream data = new ByteArrayOutputStream();
- int len = object.getBodylist().size();
- for (int i = 0; i < len - 1; i++) {
- data.write(object.getBodylist().get(i));
- data.write("\n".getBytes("utf8"));
+ for (byte[] entry : object.getBodylist()) {
+ if (totalCnt++ > 0) {
+ data.write("\n".getBytes("utf8"));
+ }
+ data.write(entry);
}
- data.write(object.getBodylist().get(len - 1));
- ByteBuffer databuffer = ByteBuffer.allocate(4);
- databuffer.putInt(data.toByteArray().length);
- out.write(databuffer.array());
+ ByteBuffer dataBuffer = ByteBuffer.allocate(4);
+ dataBuffer.putInt(data.toByteArray().length);
+ out.write(dataBuffer.array());
out.write(data.toByteArray());
} else {
- while (iter.hasNext()) {
- byte[] entry = iter.next();
- ByteBuffer databuffer = ByteBuffer.allocate(4);
- databuffer.putInt(entry.length);
- out.write(databuffer.array());
+ for (byte[] entry : object.getBodylist()) {
+ ByteBuffer dataBuffer = ByteBuffer.allocate(4);
+ dataBuffer.putInt(entry.length);
+ out.write(dataBuffer.array());
out.write(entry);
}
}
body = out.toByteArray();
}
- if (object.getBodyBytes() != null && object.getBodyBytes().length
!= 0) {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
-
- ByteBuffer databuffer = ByteBuffer.allocate(4);
- databuffer.putInt(object.getBodyBytes().length);
- out.write(databuffer.array());
- out.write(object.getBodyBytes());
- body = out.toByteArray();
- }
-
if (body != null) {
if (object.isCompress()) {
body = processCompress(body);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
index c891788465..e1412a9936 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
@@ -847,7 +847,8 @@ public class ClientMgr {
}
logger.debug("active host to send heartbeat! {}",
hostInfo.getReferenceName());
String hbMsg = "heartbeat:" + hostInfo.getHostName();
- EncodeObject encodeObject = new
EncodeObject(hbMsg.getBytes(StandardCharsets.UTF_8),
+ EncodeObject encodeObject = new EncodeObject(
+
Collections.singletonList(hbMsg.getBytes(StandardCharsets.UTF_8)),
8, false, false, false, System.currentTimeMillis() / 1000, 1,
"", "", "");
try {
if (configure.isNeedAuthentication()) {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
index d9817b7a03..f0bd45d13e 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
@@ -171,22 +171,6 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
return context;
}
- /**
- * sendMessage
- *
- * @param body
- * @param attributes
- * @param msgUUID
- * @param timeout
- * @param timeUnit
- * @return SendResult
- * @deprecated
- */
- @Override
- public SendResult sendMessage(byte[] body, String attributes, String
msgUUID, long timeout, TimeUnit timeUnit) {
- return SendResult.INVALID_ATTRIBUTES;
- }
-
/**
* sendMessage
*
@@ -345,24 +329,6 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
return refResult.get();
}
- /**
- * asyncSendMessage
- *
- * @param callback
- * @param body
- * @param attributes
- * @param msgUUID
- * @param timeout
- * @param timeUnit
- * @throws ProxysdkException
- * @deprecated
- */
- @Override
- public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String attributes, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException {
- throw new ProxysdkException("Not support");
- }
-
/**
* asyncSendMessage
*
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
index e735850360..ed62ccb4b6 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
+import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -195,7 +196,7 @@ public class MetricWorkerThread extends Thread implements
Closeable {
sender.asyncSendMessage(encodeObject, callBack,
String.valueOf(System.currentTimeMillis()), 20,
TimeUnit.SECONDS);
} else {
- logger.error("Send metric failure: {} {}",
encodeObject.getBodyBytes(), encodeObject.getBodylist());
+ logger.error("Send metric failure: {}",
encodeObject.getBodylist());
}
} catch (Throwable ex) {
logger.warn("Send metric throw exception", ex);
@@ -204,7 +205,7 @@ public class MetricWorkerThread extends Thread implements
Closeable {
}
private void sendSingleLine(String line, String streamId, long dtTime) {
- EncodeObject encodeObject = new EncodeObject(line.getBytes(), 7,
+ EncodeObject encodeObject = new
EncodeObject(Collections.singletonList(line.getBytes()), 7,
false, false, false,
dtTime, idGenerator.getNextInt(),
metricConfig.getMetricGroupId(), streamId, "", "",
Utils.getLocalIp());
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java
new file mode 100644
index 0000000000..edb14c62b2
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.utils;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class LogCounter {
+
+ private final AtomicLong counter = new AtomicLong(0);
+
+ private long start = 10L;
+ private long control = 100000L;
+ private long reset = 60 * 1000L;
+
+ private AtomicLong lastLogTime = new
AtomicLong(System.currentTimeMillis());
+
+ public LogCounter(long start, long control, long reset) {
+ this.start = start;
+ this.control = control;
+ this.reset = reset;
+ }
+
+ public boolean shouldPrint() {
+ long curTime = lastLogTime.get();
+ if (System.currentTimeMillis() - curTime > reset) {
+ if (lastLogTime.compareAndSet(curTime,
System.currentTimeMillis())) {
+ counter.set(0);
+ }
+ }
+ return counter.incrementAndGet() <= start || counter.get() % control
== 0;
+ }
+}