This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-613
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-613 by this push:
new eca5788 [INLONG-641] Add inlong-common module (#486)
eca5788 is described below
commit eca5788a5ce94fed7236982223989da189dfde8e
Author: gosonzhang <[email protected]>
AuthorDate: Wed Jun 30 16:56:36 2021 +0800
[INLONG-641] Add inlong-common module (#486)
Co-authored-by: gosonzhang <[email protected]>
---
inlong-common/pom.xml | 148 +++
.../inlong/commons/msg/AttributeConstants.java | 74 ++
.../apache/inlong/commons/msg/DataInputBuffer.java | 116 ++
.../inlong/commons/msg/DataOutputBuffer.java | 133 +++
.../java/org/apache/inlong/commons/msg/TDMsg1.java | 1108 ++++++++++++++++++++
.../inlong/commons/msg/TDMsgAttrBuilder.java | 457 ++++++++
pom.xml | 1 +
7 files changed, 2037 insertions(+)
diff --git a/inlong-common/pom.xml b/inlong-common/pom.xml
new file mode 100644
index 0000000..265f3b4
--- /dev/null
+++ b/inlong-common/pom.xml
@@ -0,0 +1,148 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>inlong</artifactId>
+ <version>0.9.0-incubating-SNAPSHOT</version>
+ </parent>
+ <packaging>jar</packaging>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>inlong-common</artifactId>
+ <name>Apache InLong - Common</name>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.1.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.6.4</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.4</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>11.0.2</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <plugins>
+ <!-- bind the maven-assembly-plugin to the package phase this will
create
+ a jar file without the storm dependencies suitable for
deployment to a cluster. -->
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <archive>
+ <manifest>
+ <mainClass></mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e
settings
+ only. It has no influence on the Maven build itself. -->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>
+ com.theoryinpractise
+ </groupId>
+ <artifactId>
+ clojure-maven-plugin
+ </artifactId>
+ <versionRange>
+ [1.3.8,)
+ </versionRange>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+</project>
\ No newline at end of file
diff --git
a/inlong-common/src/main/java/org/apache/inlong/commons/msg/AttributeConstants.java
b/inlong-common/src/main/java/org/apache/inlong/commons/msg/AttributeConstants.java
new file mode 100644
index 0000000..857d7f0
--- /dev/null
+++
b/inlong-common/src/main/java/org/apache/inlong/commons/msg/AttributeConstants.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.msg;
+
+public interface AttributeConstants {
+
+ String SEPARATOR = "&";
+ String KEY_VALUE_SEPARATOR = "=";
+
+ /**
+ * business id
+ * unique string id for each business or product
+ */
+ String BUSINESS_ID = "bid";
+
+ /**
+ * interface id
+ * unique string id for each interface of business
+ * An interface stand for a kind of data
+ */
+ String INTERFACE_ID = "tid";
+
+ /**
+ * iname is like a tid but used in file protocol(m=xxx)
+ */
+ String INAME = "iname";
+
+ /**
+ * data time
+ */
+ String DATA_TIME = "dt";
+
+ String TIME_STAMP = "t";
+
+ /* compress type */
+ String COMPRESS_TYPE = "cp";
+
+ /* count value for how many records a message body contains */
+ String MESSAGE_COUNT = "cnt";
+
+ /* message type */
+ String MESSAGE_TYPE = "mt";
+
+ /* sort type */
+ String METHOD = "m";
+
+ /* global unique id for a message*/
+ String SEQUENCE_ID = "sid";
+
+ String UNIQ_ID = "uniq";
+
+ /* from where */
+ String FROM = "f";
+
+ String RCV_TIME = "rt";
+
+ String NODE_IP = "NodeIP";
+
+}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/commons/msg/DataInputBuffer.java
b/inlong-common/src/main/java/org/apache/inlong/commons/msg/DataInputBuffer.java
new file mode 100644
index 0000000..352b8a8
--- /dev/null
+++
b/inlong-common/src/main/java/org/apache/inlong/commons/msg/DataInputBuffer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.msg;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+
+/**
+ * A reusable {@link DataInput} implementation that reads from an in-memory
+ * buffer.
+ *
+ * <p>This saves memory over creating a new DataInputStream and
+ * ByteArrayInputStream each time data is read.
+ *
+ * <p>Typical usage is something like the following:<pre>
+ *
+ * DataInputBuffer buffer = new DataInputBuffer();
+ * while (... loop condition ...) {
+ * byte[] data = ... get data ...;
+ * int dataLength = ... get data length ...;
+ * buffer.reset(data, dataLength);
+ * ... read buffer using DataInput methods ...
+ * }
+ * </pre>
+ */
+public class DataInputBuffer extends DataInputStream {
+ private static class Buffer extends ByteArrayInputStream {
+ public Buffer() {
+ super(new byte[]{
+ //
+ });
+ }
+
+ public void reset(byte[] input, int start, int length) {
+ this.buf = input;
+ this.count = start + length;
+ this.mark = start;
+ this.pos = start;
+ }
+
+ public byte[] getData() {
+ return buf;
+ }
+
+ public int getPosition() {
+ return pos;
+ }
+
+ public int getLength() {
+ return count;
+ }
+ }
+
+ private Buffer buffer;
+
+ /**
+ * Constructs a new empty buffer.
+ */
+ public DataInputBuffer() {
+ this(new Buffer());
+ }
+
+ private DataInputBuffer(Buffer buffer) {
+ super(buffer);
+ this.buffer = buffer;
+ }
+
+ /**
+ * Resets the data that the buffer reads.
+ */
+ public void reset(byte[] input, int length) {
+ buffer.reset(input, 0, length);
+ }
+
+ /**
+ * Resets the data that the buffer reads.
+ */
+ public void reset(byte[] input, int start, int length) {
+ buffer.reset(input, start, length);
+ }
+
+ public byte[] getData() {
+ return buffer.getData();
+ }
+
+ /**
+ * Returns the current position in the input.
+ */
+ public int getPosition() {
+ return buffer.getPosition();
+ }
+
+ /**
+ * Returns the length of the input.
+ */
+ public int getLength() {
+ return buffer.getLength();
+ }
+
+}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/commons/msg/DataOutputBuffer.java
b/inlong-common/src/main/java/org/apache/inlong/commons/msg/DataOutputBuffer.java
new file mode 100644
index 0000000..207800f
--- /dev/null
+++
b/inlong-common/src/main/java/org/apache/inlong/commons/msg/DataOutputBuffer.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.msg;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A reusable {@link DataOutput} implementation that writes to an in-memory
+ * buffer.
+ *
+ * <p>This saves memory over creating a new DataOutputStream and
+ * ByteArrayOutputStream each time data is written.
+ *
+ * <p>Typical usage is something like the following:<pre>
+ *
+ * DataOutputBuffer buffer = new DataOutputBuffer();
+ * while (... loop condition ...) {
+ * buffer.reset();
+ * ... write buffer using DataOutput methods ...
+ * byte[] data = buffer.getData();
+ * int dataLength = buffer.getLength();
+ * ... write data to its ultimate destination ...
+ * }
+ * </pre>
+ */
+public class DataOutputBuffer extends DataOutputStream {
+
+ private static class Buffer extends ByteArrayOutputStream {
+
+ public byte[] getData() {
+ return buf;
+ }
+
+ public int getLength() {
+ return count;
+ }
+
+ public Buffer() {
+ super();
+ }
+
+ public Buffer(int size) {
+ super(size);
+ }
+
+ public void write(DataInput in, int len) throws IOException {
+ int newcount = count + len;
+ if (newcount > buf.length) {
+ byte[] newbuf = new byte[Math.max(buf.length << 1, newcount)];
+ System.arraycopy(buf, 0, newbuf, 0, count);
+ buf = newbuf;
+ }
+ in.readFully(buf, count, len);
+ count = newcount;
+ }
+ }
+
+ private Buffer buffer;
+
+ /**
+ * Constructs a new empty buffer.
+ */
+ public DataOutputBuffer() {
+ this(new Buffer());
+ }
+
+ public DataOutputBuffer(int size) {
+ this(new Buffer(size));
+ }
+
+ private DataOutputBuffer(Buffer buffer) {
+ super(buffer);
+ this.buffer = buffer;
+ }
+
+ /**
+ * Returns the current contents of the buffer.
+ * Data is only valid to {@link #getLength()}.
+ */
+ public byte[] getData() {
+ return buffer.getData();
+ }
+
+ /**
+ * Returns the length of the valid data currently in the buffer.
+ */
+ public int getLength() {
+ return buffer.getLength();
+ }
+
+ /**
+ * Resets the buffer to empty.
+ */
+ public DataOutputBuffer reset() {
+ this.written = 0;
+ buffer.reset();
+ return this;
+ }
+
+ /**
+ * Writes bytes from a DataInput directly into the buffer.
+ */
+ public void write(DataInput in, int length) throws IOException {
+ buffer.write(in, length);
+ }
+
+ /**
+ * Write to a file stream
+ */
+ public void writeTo(OutputStream out) throws IOException {
+ buffer.writeTo(out);
+ }
+}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java
b/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java
new file mode 100644
index 0000000..8919f99
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java
@@ -0,0 +1,1108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.msg;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.xerial.snappy.Snappy;
+
+public class TDMsg1 {
+ private static final int DEFAULT_CAPACITY = 4096;
+ private final int capacity;
+
+ private static final int BIN_MSG_NO_ZIP = 0;
+ private static final int BIN_MSG_SNAPPY_TYPE = 1;
+
+ private static final int BIN_MSG_TOTALLEN_OFFSET = 0;
+ private static final int BIN_MSG_BID_OFFSET = 5;
+ private static final int BIN_MSG_TID_OFFSET = 7;
+ private static final int BIN_MSG_EXTFIELD_OFFSET = 9;
+ private static final int BIN_MSG_COUNT_OFFSET = 15;
+ private static final int BIN_MSG_DATATIME_OFFSET = 11;
+ private static final int BIN_MSG_TOTALLEN_SIZE = 4;
+ private static final int BIN_MSG_MSGTYPE_OFFSET = 4;
+ private static final int BIN_MSG_SET_SNAPPY = (1 << 5);
+ private static final int BIN_MSG_BODYLEN_SIZE = 4;
+ private static final int BIN_MSG_BODYLEN_OFFSET = 21;
+ private static final int BIN_MSG_BODY_OFFSET =
+ BIN_MSG_BODYLEN_SIZE + BIN_MSG_BODYLEN_OFFSET;
+ private static final int BIN_MSG_ATTRLEN_SIZE = 2;
+ private static final int BIN_MSG_FORMAT_SIZE = 29;
+ private static final int BIN_MSG_MAGIC_SIZE = 2;
+ private static final int BIN_MSG_MAGIC = 0xEE01;
+
+ private static final byte[] MAGIC0 = {(byte) 0xf, (byte) 0x0};
+ // with timestamp
+ private static final byte[] MAGIC1 = {(byte) 0xf, (byte) 0x1};
+ // with msg cnt 20130619
+ private static final byte[] MAGIC2 = {(byte) 0xf, (byte) 0x2};
+ // support msg_type = 6
+ private static final byte[] MAGIC3 = {(byte) 0xf, (byte) 0x3};
+ // support binmsg
+ private static final byte[] MAGIC4 = {(byte) 0xf, (byte) 0x4};
+
+ private final boolean addmode;
+
+ private static final Joiner.MapJoiner MAP_JOINER =
+ Joiner.on(AttributeConstants.SEPARATOR)
+
.withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
+ private static final Splitter.MapSplitter MAP_SPLITTER =
+ Splitter.on(AttributeConstants.SEPARATOR)
+
.trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
+
+ static class DataBuffer {
+
+ DataOutputBuffer out;
+ int cnt;
+
+ public DataBuffer() {
+ out = new DataOutputBuffer();
+ }
+
+ public void write(byte[] array, int position, int len)
+ throws IOException {
+ cnt++;
+ out.writeInt(len);
+ out.write(array, position, len);
+ }
+ }
+
+ private LinkedHashMap<String, DataBuffer> attr2MsgBuffer;
+ private ByteBuffer binMsgBuffer;
+ private int datalen = 0;
+ private int msgcnt = 0;
+ private boolean compress;
+ private boolean isNumBid = false;
+ private boolean ischeck = true;
+
+ private final Version version;
+ private long timeoffset = 0;
+
+ public void setTimeoffset(long offset) {
+ this.timeoffset = offset;
+ }
+
+ private enum Version {
+ vn(-1), v0(0), v1(1),
+ v2(2), v3(3), v4(4);
+
+ private static final Map<Integer, Version> INT_TO_TYPE_MAP =
+ new HashMap<Integer, Version>();
+
+ static {
+ for (Version type : Version.values()) {
+ INT_TO_TYPE_MAP.put(type.value, type);
+ }
+ }
+
+ private final int value;
+
+ private Version(int value) {
+ this.value = value;
+ }
+
+ public int intValue() {
+ return value;
+ }
+
+ public static Version of(int v) {
+ if (!INT_TO_TYPE_MAP.containsKey(v)) {
+ return vn;
+ }
+ return INT_TO_TYPE_MAP.get(v);
+ }
+
+ }
+
+ /**
+ * capacity: 4096, compress: true, version: 1
+ *
+ * @return
+ */
+ public static TDMsg1 newTDMsg() {
+ return newTDMsg(true);
+ }
+
+ /**
+ * capacity: 4096, version: 1
+ *
+ * @param compress if copress
+ * @return TDMsg1
+ */
+ public static TDMsg1 newTDMsg(boolean compress) {
+ return newTDMsg(DEFAULT_CAPACITY, compress);
+ }
+
+ /**
+ * capacity: 4096, compress: true
+ *
+ * @param v version info
+ * @return TDMsg1
+ */
+ public static TDMsg1 newTDMsg(int v) {
+ return newTDMsg(DEFAULT_CAPACITY, true, v);
+ }
+
+ /**
+ * capacity: 4096
+ *
+ * @param compress if compress
+ * @param v version
+ * @return TDMsg1
+ */
+ public static TDMsg1 newTDMsg(boolean compress, int v) {
+ return newTDMsg(DEFAULT_CAPACITY, compress, v);
+ }
+
+ /**
+ * version: 1
+ *
+ * @param capacity data capacity
+ * @param compress if compress
+ * @return TDMsg1
+ */
+ public static TDMsg1 newTDMsg(int capacity, boolean compress) {
+ return new TDMsg1(capacity, compress, Version.v1);
+ }
+
+ /**
+ * @param capacity data capacity
+ * @param compress compress
+ * @param v version
+ * @return TDMsg1
+ */
+ public static TDMsg1 newTDMsg(int capacity, boolean compress, int v) {
+ return new TDMsg1(capacity, compress, Version.of(v));
+ }
+
+ // for create
+ private TDMsg1(int capacity, boolean compress, Version v) {
+ version = v;
+ addmode = true;
+ this.compress = compress;
+ this.capacity = capacity;
+ attr2MsgBuffer = new LinkedHashMap<String, DataBuffer>();
+ parsedInput = null;
+ reset();
+ }
+
+ /**
+ * return false means current msg is big enough, no other data should be
+ * added again, but attention: the input data has already been added, and
if
+ * you add another data after return false it can also be added
+ * successfully.
+ *
+ * @param attr attribute info
+ * @param data binary data
+ * @param offset data start offset
+ * @param len data length
+ * @return boolean
+ */
+ public boolean addMsg(String attr, byte[] data, int offset, int len) {
+ return addMsg(attr, ByteBuffer.wrap(data, offset, len));
+ }
+
+ public boolean addMsg(String attr, ByteBuffer data) {
+ checkMode(true);
+
+ if ((version.intValue() == Version.v3.intValue())
+ && !checkData(data)) {
+ return false;
+ }
+
+ DataBuffer outputBuffer = attr2MsgBuffer.get(attr);
+ if (outputBuffer == null) {
+ outputBuffer = new DataBuffer();
+ attr2MsgBuffer.put(attr, outputBuffer);
+ // attrlen + utflen + meglen + compress
+ this.datalen += attr.length() + 2 + 4 + 1;
+ }
+
+ int len = data.remaining();
+ try {
+ outputBuffer.write(data.array(), data.position(), len);
+ this.datalen += len + 4;
+ if (version.intValue() == Version.v2.intValue()) {
+ this.datalen += 4;
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ return false;
+ }
+ msgcnt++;
+ return checkLen(attr, len);
+ }
+
+ public boolean addMsg(String attr, byte[] data) {
+ return addMsg(attr, ByteBuffer.wrap(data));
+ }
+
+ /* 新增支持新数据消息 */
+ public boolean addMsg(byte[] data) {
+ return addMsg(ByteBuffer.wrap(data));
+ }
+
+ public boolean addMsg(ByteBuffer data) {
+ if (!checkBinData(data)) {
+ return false;
+ }
+
+ if (binMsgBuffer != null) {
+ return false;
+ }
+
+ binMsgBuffer = ByteBuffer.allocate(data.remaining());
+ binMsgBuffer.put(data);
+
+ binMsgBuffer.position(BIN_MSG_TOTALLEN_OFFSET);
+ msgcnt = getBinMsgCnt(binMsgBuffer);
+ return true;
+ }
+
+ private int getBinMsgtype(ByteBuffer data) {
+ return data.get(BIN_MSG_MSGTYPE_OFFSET);
+ }
+
+ private int getBinMsgCnt(ByteBuffer data) {
+ return data.getShort(BIN_MSG_COUNT_OFFSET);
+ }
+
+ private long getBinCreatetime(ByteBuffer data) {
+ return data.getInt(BIN_MSG_DATATIME_OFFSET) * 1000L;
+ }
+
+ private boolean getBinNumFlag(ByteBuffer data) {
+ return (data.getShort(BIN_MSG_EXTFIELD_OFFSET) & 0x4) == 0;
+ }
+
+ private boolean checkBinData(ByteBuffer data) {
+ // 检查消息合法性
+ int totalLen = data.getInt(BIN_MSG_TOTALLEN_OFFSET);
+ int bodyLen = data.getInt(BIN_MSG_BODYLEN_OFFSET);
+ int attrLen = data.getShort(BIN_MSG_BODY_OFFSET + bodyLen);
+ int msgMagic = (data.getShort(BIN_MSG_BODY_OFFSET + bodyLen
+ + BIN_MSG_ATTRLEN_SIZE + attrLen) & 0xFFFF);
+
+ if ((totalLen + BIN_MSG_TOTALLEN_SIZE != (bodyLen + attrLen +
BIN_MSG_FORMAT_SIZE))
+ || (msgMagic != BIN_MSG_MAGIC)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ public boolean addMsgs(String attr, ByteBuffer data) {
+ boolean res = true;
+ Iterator<ByteBuffer> it = getIteratorBuffer(data);
+ setCheckMode(false);
+ while (it.hasNext()) {
+ res = this.addMsg(attr, it.next());
+ }
+ setCheckMode(true);
+ return res;
+ }
+
+ private void setCheckMode(boolean mode) {
+ if (version.intValue() == Version.v3.intValue()) {
+ ischeck = mode;
+ }
+ }
+
+ // Version 3 message, need check data content
+ private boolean checkData(ByteBuffer data) {
+ if ((version.intValue() == Version.v3.intValue()) && !ischeck) {
+ return true;
+ }
+
+ // check data
+ data.mark();
+ int msgnum = 0;
+ while (data.remaining() > 0) {
+ int datalen = data.getInt();
+ if (datalen > data.remaining()) {
+ return false;
+ }
+ msgnum++;
+ byte[] record = new byte[datalen];
+ data.get(record, 0, datalen);
+ }
+
+ msgnum = msgnum / 2;
+ if (msgnum > 1) {
+ msgcnt += msgnum - 1;
+ }
+ data.reset();
+ return true;
+ }
+
+ private boolean checkLen(String attr, int len) {
+ return datalen < capacity;
+ }
+
+ public boolean isfull() {
+ checkMode(true);
+ if (datalen >= capacity) {
+ return true;
+ }
+ return false;
+ }
+
+ private ByteBuffer defaultBuild(long createtime) {
+ try {
+ this.createtime = createtime;
+ DataOutputBuffer out = new DataOutputBuffer(capacity);
+
+ writeHeader(out);
+ out.writeInt(attr2MsgBuffer.size());
+
+ if (compress) {
+ for (String attr : attr2MsgBuffer.keySet()) {
+ out.writeUTF(attr);
+ DataBuffer data = attr2MsgBuffer.get(attr);
+ if (version.intValue() == Version.v2.intValue()) {
+ out.writeInt(data.cnt);
+ }
+ int guessLen =
+ Snappy.maxCompressedLength(data.out.getLength());
+ byte[] tmpData = new byte[guessLen];
+ int len = Snappy.compress(data.out.getData(), 0,
+ data.out.getLength(), tmpData, 0);
+ out.writeInt(len + 1);
+ out.writeBoolean(compress);
+ out.write(tmpData, 0, len);
+ }
+ } else {
+ for (String attr : attr2MsgBuffer.keySet()) {
+ out.writeUTF(attr);
+ DataBuffer data = attr2MsgBuffer.get(attr);
+ if (version.intValue() == Version.v2.intValue()) {
+ out.writeInt(data.cnt);
+ }
+ out.writeInt(data.out.getLength() + 1);
+ out.writeBoolean(compress);
+ out.write(data.out.getData(), 0, data.out.getLength());
+ }
+ }
+ writeMagic(out);
+ out.close();
+ return ByteBuffer.wrap(out.getData(), 0, out.getLength());
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ private ByteBuffer binBuild(long createtime) {
+ try {
+ this.createtime = createtime;
+ DataOutputBuffer out = new DataOutputBuffer(capacity);
+
+ writeMagic(out);
+
+ int msgType = getBinMsgtype(binMsgBuffer);
+ int compressType = ((msgType & 0xE0) >> 5);
+ if ((compressType == 0) && (compress)) {
+
+ binMsgBuffer.position(BIN_MSG_BODYLEN_OFFSET);
+ // copy body data
+ int bodyLen = binMsgBuffer.getInt();
+ byte[] body = new byte[bodyLen];
+ binMsgBuffer.get(body, 0, bodyLen);
+
+ // copy attributes
+ int attrLen =
+ binMsgBuffer.getShort(BIN_MSG_BODY_OFFSET + bodyLen);
+ byte[] attr =
+ new byte[BIN_MSG_ATTRLEN_SIZE + attrLen +
BIN_MSG_MAGIC_SIZE];
+ binMsgBuffer.get(attr, 0, attr.length);
+
+ int guessLen = Snappy.maxCompressedLength(bodyLen);
+ byte[] tmpData = new byte[guessLen];
+ int realLen = Snappy.compress(body, 0,
+ body.length, tmpData, 0);
+
+ int totalDataLen =
binMsgBuffer.getInt(BIN_MSG_TOTALLEN_OFFSET);
+ ByteBuffer dataBuf = ByteBuffer.allocate(
+ totalDataLen + BIN_MSG_TOTALLEN_SIZE - body.length +
realLen);
+
+ // copy headers
+ dataBuf.put(binMsgBuffer.array(), 0, BIN_MSG_BODYLEN_OFFSET);
+ // set compress flag
+ dataBuf.put(BIN_MSG_MSGTYPE_OFFSET, (byte) (msgType |
BIN_MSG_SET_SNAPPY));
+ dataBuf.putInt(BIN_MSG_TOTALLEN_OFFSET,
+ realLen + attrLen + BIN_MSG_FORMAT_SIZE - 4);
+ // set data length
+ dataBuf.putInt(BIN_MSG_BODYLEN_OFFSET, realLen);
+ // fill compressed data
+ System.arraycopy(tmpData, 0,
+ dataBuf.array(), BIN_MSG_BODY_OFFSET, realLen);
+ // fill attributes and MAGIC
+ System.arraycopy(attr, 0, dataBuf.array(),
+ BIN_MSG_BODY_OFFSET + realLen, attr.length);
+
+ out.write(dataBuf.array(), 0, dataBuf.capacity());
+ } else {
+ out.write(binMsgBuffer.array(), 0, binMsgBuffer.capacity());
+ }
+
+ writeMagic(out);
+ out.close();
+ return ByteBuffer.wrap(out.getData(), 0, out.getLength());
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ public ByteBuffer build() {
+ return build(System.currentTimeMillis() + timeoffset);
+ }
+
+ public ByteBuffer build(long createtime) {
+ checkMode(true);
+ if (version.intValue() != Version.v4.intValue()) {
+ return defaultBuild(createtime);
+ } else {
+ return binBuild(createtime);
+ }
+ }
+
+ private void writeHeader(DataOutputBuffer out) throws IOException {
+ writeMagic(out);
+ if (version.intValue() == Version.v4.intValue()) {
+ return;
+ }
+
+ if (version.intValue() >= Version.v1.intValue()) {
+ // createtime = System.currentTimeMillis() + timeoffset;
+ out.writeLong(createtime);
+ }
+ if (version.intValue() >= Version.v2.intValue()) {
+ out.writeInt(this.getMsgCnt());
+ }
+ }
+
+ private void writeMagic(DataOutputBuffer out) throws IOException {
+ if (version == Version.v1) {
+ out.write(MAGIC1[0]);
+ out.write(MAGIC1[1]);
+ } else if (version == Version.v2) {
+ out.write(MAGIC2[0]);
+ out.write(MAGIC2[1]);
+ } else if (version == Version.v3) {
+ out.write(MAGIC3[0]);
+ out.write(MAGIC3[1]);
+ } else if (version == Version.v4) {
+ out.write(MAGIC4[0]);
+ out.write(MAGIC4[1]);
+ } else {
+ throw new IOException("wrong version : " + version.intValue());
+ }
+ }
+
+ public byte[] buildArray() {
+ return buildArray(System.currentTimeMillis() + timeoffset);
+ }
+
+ public byte[] buildArray(long createtime) {
+ ByteBuffer buffer = this.build(createtime);
+ if (buffer == null) {
+ return null;
+ }
+ byte[] res = new byte[buffer.remaining()];
+ System.arraycopy(buffer.array(), buffer.position(), res, 0,
res.length);
+ return res;
+ }
+
+ public void reset() {
+ checkMode(true);
+ this.attr2MsgBuffer.clear();
+ this.datalen = getHeaderLen();
+ msgcnt = 0;
+ }
+
+ private int getHeaderLen() {
+ int len = 4; // magic
+ if (version.intValue() >= Version.v1.intValue()) {
+ len += 8; // create time
+ }
+ if (version.intValue() == Version.v2.intValue()) {
+ len += 4; // msgcnt
+ }
+
+ return len + 4; // attrcnt
+ }
+
+ // for both mode
+ public int getMsgCnt() {
+ return msgcnt;
+ }
+
+ public int getMsgCnt(String attr) {
+ if (addmode) {
+ return this.attr2MsgBuffer.get(attr).cnt;
+ } else {
+ return this.attr2Rawdata.get(attr).cnt;
+ }
+ }
+
+ private void checkMode(boolean add) {
+ if (addmode != add) {
+ throw new RuntimeException(
+ addmode ? "illegal operation in add mode !!!"
+ : "illegal operation in parse mode !!!");
+ }
+ }
+
+ private int attrcnt = -1;
+
+ // private LinkedHashMap<String, ByteBuffer> attr2Rawdata = null;
+ static class DataByteBuffer {
+ final int cnt;
+ ByteBuffer buffer;
+ DataOutputBuffer inoutBuffer;
+
+ public DataByteBuffer(int cnt, ByteBuffer buffer) {
+ this.cnt = cnt;
+ this.buffer = buffer;
+ }
+
+ public DataByteBuffer(int cnt, DataOutputBuffer inoutbuffer) {
+ this.cnt = cnt;
+ this.inoutBuffer = inoutbuffer;
+ }
+
+ public void syncByteBuffer() {
+ this.buffer = ByteBuffer.wrap(inoutBuffer.getData(), 0,
inoutBuffer.getLength());
+ }
+ }
+
+ private LinkedHashMap<String, DataByteBuffer> attr2Rawdata = null;
+
+ // not used right now
+ // private LinkedHashMap<String, Integer> attr2index = null;
+ private long createtime = -1;
+ private boolean parsed = false;
+ private DataInputBuffer parsedInput;
+ private ByteBuffer parsedBinInput;
+
+ // for parsed
+ private TDMsg1(ByteBuffer buffer, Version magic) throws IOException {
+ version = magic;
+ addmode = false;
+ capacity = 0;
+
+ if (version.intValue() != Version.v4.intValue()) {
+ parsedInput = new DataInputBuffer();
+ parsedInput.reset(buffer.array(), buffer.position() + 2,
+ buffer.remaining());
+ if (version.intValue() >= Version.v1.intValue()) {
+ createtime = parsedInput.readLong();
+ }
+
+ if (version.intValue() >= Version.v2.intValue()) {
+ this.msgcnt = parsedInput.readInt();
+ }
+
+ attrcnt = parsedInput.readInt();
+ } else {
+ byte[] binMsg = new byte[buffer.remaining() - 2];
+ System.arraycopy(buffer.array(),
+ buffer.position() + 2, binMsg, 0, binMsg.length);
+ parsedBinInput = ByteBuffer.wrap(binMsg);
+ this.createtime = getBinCreatetime(parsedBinInput);
+ this.msgcnt = getBinMsgCnt(parsedBinInput);
+ this.isNumBid = getBinNumFlag(parsedBinInput);
+ }
+ }
+
+ private void parseDefault() throws IOException {
+ attr2Rawdata = new LinkedHashMap<String, DataByteBuffer>(
+ attrcnt * 10 / 7);
+ for (int i = 0; i < attrcnt; i++) {
+ String attr = parsedInput.readUTF();
+ int cnt = 0;
+ if (version.intValue() == Version.v2.intValue()) {
+ cnt = parsedInput.readInt();
+ }
+ int len = parsedInput.readInt();
+ int pos = parsedInput.getPosition();
+ attr2Rawdata.put(
+ attr,
+ new DataByteBuffer(cnt, ByteBuffer.wrap(
+ parsedInput.getData(), pos, len)));
+ parsedInput.skip(len);
+ }
+ }
+
+ private void parseMixAttr() throws IOException {
+ attr2Rawdata = new LinkedHashMap<String, DataByteBuffer>(
+ this.msgcnt * 10 / 7);
+
+ for (int i = 0; i < attrcnt; i++) {
+ ByteBuffer bodyBuffer;
+ String commonAttr = parsedInput.readUTF();
+ int len = parsedInput.readInt();
+ int compress = parsedInput.readByte();
+ int pos = parsedInput.getPosition();
+
+ if (compress == 1) {
+ byte[] uncompressdata = new byte[Snappy.uncompressedLength(
+ parsedInput.getData(), pos, len - 1)];
+ int msgLen = Snappy.uncompress(parsedInput.getData(), pos, len
- 1,
+ uncompressdata, 0);
+ bodyBuffer = ByteBuffer.wrap(uncompressdata, 0, msgLen);
+ } else {
+ bodyBuffer = ByteBuffer.wrap(parsedInput.getData(), pos, len -
1);
+ }
+ parsedInput.skip(len - 1);
+
+ while (bodyBuffer.remaining() > 0) {
+ // total message length = (data length + attributes length) * N
+ int singleTotalLen = bodyBuffer.getInt();
+ if (singleTotalLen > bodyBuffer.remaining()) {
+ return;
+ }
+
+ while (singleTotalLen > 0) {
+ // single data length
+ int msgItemLen = bodyBuffer.getInt();
+ if (msgItemLen <= 0 || msgItemLen > singleTotalLen) {
+ return;
+ }
+
+ byte[] record = new byte[1 + 4 + msgItemLen];
+ record[0] = 0;
+ record[1] = (byte) ((msgItemLen >> 24) & 0xFF);
+ record[2] = (byte) ((msgItemLen >> 16) & 0xFF);
+ record[3] = (byte) ((msgItemLen >> 8) & 0xFF);
+ record[4] = (byte) (msgItemLen & 0xFF);
+ bodyBuffer.get(record, 1 + 4, msgItemLen);
+
+ // single attribute length
+ int singleAttrLen = bodyBuffer.getInt();
+ if (singleAttrLen <= 0 || singleAttrLen > singleTotalLen) {
+ return;
+ }
+ byte[] attrBuf = new byte[singleAttrLen];
+ bodyBuffer.get(attrBuf, 0, singleAttrLen);
+ String finalAttr = commonAttr + "&" + new String(attrBuf);
+
+ DataByteBuffer inputBuffer = attr2Rawdata.get(finalAttr);
+ if (inputBuffer == null) {
+ inputBuffer = new DataByteBuffer(0,
+ new DataOutputBuffer(msgItemLen + 4 + 1));
+ attr2Rawdata.put(finalAttr, inputBuffer);
+ inputBuffer.inoutBuffer.write(record, 0, msgItemLen +
4 + 1);
+ } else {
+ inputBuffer.inoutBuffer.write(record, 1, msgItemLen +
4);
+ }
+ singleTotalLen = singleTotalLen - msgItemLen -
singleAttrLen - 8;
+ }
+ }
+ }
+
+ // sync data
+ for (String attr : attr2Rawdata.keySet()) {
+ DataByteBuffer data = attr2Rawdata.get(attr);
+ data.syncByteBuffer();
+ }
+ }
+
+ private void parseBinMsg() throws IOException {
+ Map<String, String> commonAttrMap = new HashMap<String, String>();
+
+ int totalLen = parsedBinInput.getInt(BIN_MSG_TOTALLEN_OFFSET);
+ final int msgtype = parsedBinInput.get(BIN_MSG_MSGTYPE_OFFSET);
+ int bidNum = parsedBinInput.getShort(BIN_MSG_BID_OFFSET);
+ int tidNum = parsedBinInput.getShort(BIN_MSG_TID_OFFSET);
+ int bodyLen = parsedBinInput.getInt(BIN_MSG_BODYLEN_OFFSET);
+ long dataTime = parsedBinInput.getInt(BIN_MSG_DATATIME_OFFSET);
+ final int extField = parsedBinInput.getShort(BIN_MSG_EXTFIELD_OFFSET);
+ int attrLen = parsedBinInput.getShort(BIN_MSG_BODY_OFFSET + bodyLen);
+ int msgMagic = (parsedBinInput.getShort(BIN_MSG_BODY_OFFSET
+ + bodyLen + BIN_MSG_ATTRLEN_SIZE + attrLen) & 0xFFFF);
+ dataTime = dataTime * 1000;
+
+ //read common attributes
+ if (attrLen != 0) {
+ byte[] attr = new byte[attrLen];
+ parsedBinInput.position(BIN_MSG_BODY_OFFSET + bodyLen +
BIN_MSG_ATTRLEN_SIZE);
+ parsedBinInput.get(attr);
+ String strAttr = new String(attr);
+
+ commonAttrMap = new HashMap<String,
String>(MAP_SPLITTER.split(strAttr));
+ }
+
+ commonAttrMap.put(AttributeConstants.DATA_TIME,
String.valueOf(dataTime));
+
+ //unzip data
+ ByteBuffer bodyBuffer;
+ byte[] body = new byte[bodyLen + 1];
+ parsedBinInput.position(BIN_MSG_BODY_OFFSET);
+ parsedBinInput.get(body, 1, bodyLen);
+ int zipType = (msgtype & 0xE0) >> 5;
+ switch (zipType) {
+ case (BIN_MSG_SNAPPY_TYPE):
+ byte[] uncompressdata =
+ new byte[Snappy.uncompressedLength(body, 1,
body.length - 1) + 1];
+ // uncompress flag
+ uncompressdata[0] = 0;
+ int msgLen = Snappy.uncompress(body, 1, body.length - 1,
+ uncompressdata, 1);
+ bodyBuffer = ByteBuffer.wrap(uncompressdata, 0, msgLen + 1);
+ break;
+
+ case (BIN_MSG_NO_ZIP):
+ default:
+ //set uncompress flag
+ body[0] = 0;
+ bodyBuffer = ByteBuffer.wrap(body, 0, body.length);
+ break;
+ }
+
+ //number bid/tid
+ boolean isUseNumBid = ((extField & 0x4) == 0x0);
+ if (isUseNumBid) {
+ commonAttrMap.put(AttributeConstants.BUSINESS_ID,
String.valueOf(bidNum));
+ commonAttrMap.put(AttributeConstants.INTERFACE_ID,
String.valueOf(tidNum));
+ }
+
+ boolean hasOtherAttr = ((extField & 0x1) == 0x1);
+ commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, "1");
+ // with private attributes,
+ // need to splice private attributes + public attributes
+ if (!hasOtherAttr) {
+ // general attributes and data map
+ attr2Rawdata = new LinkedHashMap<String, DataByteBuffer>();
+ attr2Rawdata.put(MAP_JOINER.join(commonAttrMap),
+ new DataByteBuffer(0, bodyBuffer));
+ } else {
+ attr2Rawdata = new LinkedHashMap<String, DataByteBuffer>(
+ this.msgcnt * 10 / 7);
+ Map<String, String> finalAttrMap = commonAttrMap;
+
+ //skip compress flag
+ bodyBuffer.get();
+ int bodyBufLen = bodyBuffer.capacity() - 1;
+ while (bodyBufLen > 0) {
+ // get single message length
+ int singleMsgLen = bodyBuffer.getInt();
+ if (singleMsgLen <= 0 || singleMsgLen > bodyBufLen) {
+ return;
+ }
+
+ byte[] record = new byte[1 + 4 + singleMsgLen];
+ record[0] = 0;
+ record[1] = (byte) ((singleMsgLen >> 24) & 0xFF);
+ record[2] = (byte) ((singleMsgLen >> 16) & 0xFF);
+ record[3] = (byte) ((singleMsgLen >> 8) & 0xFF);
+ record[4] = (byte) (singleMsgLen & 0xFF);
+ bodyBuffer.get(record, 1 + 4, singleMsgLen);
+
+ // get single attribute length
+ int singleAttrLen = bodyBuffer.getInt();
+ if (singleAttrLen <= 0 || singleAttrLen > bodyBufLen) {
+ return;
+ }
+ byte[] attrBuf = new byte[singleAttrLen];
+ bodyBuffer.get(attrBuf, 0, singleAttrLen);
+ String attrBufStr = new String(attrBuf);
+
+ finalAttrMap = new HashMap<String,
String>(MAP_SPLITTER.split(attrBufStr));
+ finalAttrMap.putAll(commonAttrMap);
+
+ DataByteBuffer inputBuffer =
attr2Rawdata.get(MAP_JOINER.join(finalAttrMap));
+ if (inputBuffer == null) {
+ inputBuffer = new DataByteBuffer(0,
+ new DataOutputBuffer(singleMsgLen + 4 + 1));
+ attr2Rawdata.put(MAP_JOINER.join(finalAttrMap),
inputBuffer);
+ inputBuffer.inoutBuffer.write(record, 0, singleMsgLen + 4
+ 1);
+ } else {
+ inputBuffer.inoutBuffer.write(record, 1, singleMsgLen + 4);
+ }
+
+ bodyBufLen = bodyBufLen - singleMsgLen - singleAttrLen - 8;
+ }
+
+ // sync data
+ for (String attr : attr2Rawdata.keySet()) {
+ DataByteBuffer data = attr2Rawdata.get(attr);
+ data.syncByteBuffer();
+ }
+ }
+ }
+
+ private void parse() throws IOException {
+ if (parsed) {
+ return;
+ }
+
+ if (version.intValue() < Version.v3.intValue()) {
+ parseDefault();
+ } else if (version.intValue() == Version.v3.intValue()) {
+ parseMixAttr();
+ } else {
+ parseBinMsg();
+ }
+
+ parsed = true;
+ }
+
+ private static Version getMagic(ByteBuffer buffer) {
+ // #lizard forgives
+ byte[] array = buffer.array();
+ if (buffer.remaining() < 4) {
+ return Version.vn;
+ }
+ int pos = buffer.position();
+ int rem = buffer.remaining();
+ if (array[pos] == MAGIC1[0] && array[pos + 1] == MAGIC1[1]
+ && array[pos + rem - 2] == MAGIC1[0]
+ && array[pos + rem - 1] == MAGIC1[1]) {
+ return Version.v1;
+ }
+ if (array[pos] == MAGIC2[0] && array[pos + 1] == MAGIC2[1]
+ && array[pos + rem - 2] == MAGIC2[0]
+ && array[pos + rem - 1] == MAGIC2[1]) {
+ return Version.v2;
+ }
+ if (array[pos] == MAGIC3[0] && array[pos + 1] == MAGIC3[1]
+ && array[pos + rem - 2] == MAGIC3[0]
+ && array[pos + rem - 1] == MAGIC3[1]) {
+ return Version.v3;
+ }
+ if (array[pos] == MAGIC4[0] && array[pos + 1] == MAGIC4[1]
+ && array[pos + rem - 2] == MAGIC4[0]
+ && array[pos + rem - 1] == MAGIC4[1]) {
+ return Version.v4;
+ }
+ if (array[pos] == MAGIC0[0] && array[pos + 1] == MAGIC0[1]
+ && array[pos + rem - 2] == MAGIC0[0]
+ && array[pos + rem - 1] == MAGIC0[1]) {
+ return Version.v0;
+ }
+ return Version.vn;
+ }
+
+ public static TDMsg1 parseFrom(byte[] data) {
+ return parseFrom(ByteBuffer.wrap(data));
+ }
+
+ public static TDMsg1 parseFrom(ByteBuffer buffer) {
+ Version magic = getMagic(buffer);
+ if (magic == Version.vn) {
+ return null;
+ }
+
+ try {
+ return new TDMsg1(buffer, magic);
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
+ private void makeSureParsed() {
+ if (!parsed) {
+ try {
+ parse();
+ } catch (IOException e) {
+ //
+ }
+ }
+ }
+
+ public Set<String> getAttrs() {
+ checkMode(false);
+ makeSureParsed();
+ return this.attr2Rawdata.keySet();
+ }
+
+ public byte[] getRawData(String attr) {
+ checkMode(false);
+ makeSureParsed();
+ ByteBuffer buffer = getRawDataBuffer(attr);
+ byte[] data = new byte[buffer.remaining()];
+ System.arraycopy(buffer.array(), buffer.position(), data, 0,
+ buffer.remaining());
+ return data;
+ }
+
+ public ByteBuffer getRawDataBuffer(String attr) {
+ checkMode(false);
+ makeSureParsed();
+ return this.attr2Rawdata.get(attr).buffer;
+ }
+
+ public Iterator<byte[]> getIterator(String attr) {
+ checkMode(false);
+ makeSureParsed();
+ return getIterator(this.attr2Rawdata.get(attr).buffer);
+ }
+
+ public static Iterator<byte[]> getIterator(byte[] rawdata) {
+ return getIterator(ByteBuffer.wrap(rawdata));
+ }
+
+ public static Iterator<byte[]> getIterator(ByteBuffer rawdata) {
+ try {
+ final DataInputBuffer input = new DataInputBuffer();
+ byte[] array = rawdata.array();
+ int pos = rawdata.position();
+ int rem = rawdata.remaining() - 1;
+ int compress = array[pos];
+
+ if (compress == 1) {
+ byte[] uncompressdata = new byte[Snappy.uncompressedLength(
+ array, pos + 1, rem)];
+ int len = Snappy.uncompress(array, pos + 1, rem,
+ uncompressdata, 0);
+ input.reset(uncompressdata, len);
+ } else {
+ input.reset(array, pos + 1, rem);
+ }
+
+ return new Iterator<byte[]>() {
+
+ @Override
+ public boolean hasNext() {
+ try {
+ return input.available() > 0;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return false;
+ }
+
+ @Override
+ public byte[] next() {
+ try {
+ int len;
+ len = input.readInt();
+ byte[] res = new byte[len];
+ input.read(res);
+ return res;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ @Override
+ public void remove() {
+ this.next();
+ }
+ };
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+
+ }
+
+ public static Iterator<ByteBuffer> getIteratorBuffer(byte[] rawdata) {
+ return getIteratorBuffer(ByteBuffer.wrap(rawdata));
+ }
+
+ public Iterator<ByteBuffer> getIteratorBuffer(String attr) {
+ checkMode(false);
+ makeSureParsed();
+ return getIteratorBuffer(this.attr2Rawdata.get(attr).buffer);
+ }
+
+ public static Iterator<ByteBuffer> getIteratorBuffer(ByteBuffer rawdata) {
+
+ try {
+ final DataInputBuffer input = new DataInputBuffer();
+ byte[] array = rawdata.array();
+ int pos = rawdata.position();
+ int rem = rawdata.remaining() - 1;
+ int compress = array[pos];
+
+ if (compress == 1) {
+ byte[] uncompressdata = new byte[Snappy.uncompressedLength(
+ array, pos + 1, rem)];
+ int len = Snappy.uncompress(array, pos + 1, rem,
+ uncompressdata, 0);
+ input.reset(uncompressdata, len);
+ } else {
+ input.reset(array, pos + 1, rem);
+ }
+
+ final byte[] uncompressdata = input.getData();
+
+ return new Iterator<ByteBuffer>() {
+
+ @Override
+ public boolean hasNext() {
+ try {
+ return input.available() > 0;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return false;
+ }
+
+ @Override
+ public ByteBuffer next() {
+ try {
+ int len = input.readInt();
+ int pos = input.getPosition();
+ input.skip(len);
+ return ByteBuffer.wrap(uncompressdata, pos, len);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ @Override
+ public void remove() {
+ this.next();
+ }
+ };
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+
+ }
+
+ public long getCreatetime() {
+ return createtime;
+ }
+
+ public int getAttrCount() {
+ checkMode(false);
+ return attrcnt;
+ }
+
+ public boolean isNumBid() {
+ checkMode(false);
+ return isNumBid;
+ }
+}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsgAttrBuilder.java
b/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsgAttrBuilder.java
new file mode 100644
index 0000000..df90de0
--- /dev/null
+++
b/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsgAttrBuilder.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.msg;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TDMsgAttrBuilder {
+
+ public enum PartitionUnit {
+ DAY("d"), HOUR("h"), HALFHOUR("n"),
+ QUARTER("q"), TENMINS("t"), FIVEMINS("f");
+ private static final Map<String, PartitionUnit> STRING_TO_TYPE_MAP =
+ new HashMap<String, PartitionUnit>();
+
+ static {
+ for (PartitionUnit type : PartitionUnit.values()) {
+ STRING_TO_TYPE_MAP.put(type.value, type);
+ }
+ }
+
+ private final String value;
+
+ private PartitionUnit(String value) {
+ this.value = value;
+ }
+
+ public static PartitionUnit of(String p) {
+ PartitionUnit type = STRING_TO_TYPE_MAP.get(p);
+ if (type == null) {
+ return PartitionUnit.HOUR;
+ }
+ return type;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+ }
+
+ public enum TimeType {
+ MS("#ms"), S("#s"),
+ STANDARD("#")/* yyyy-MM-dd HH:mm:ss */,
+ NORMAL("#n")/* yyyyMMddHH */;
+ private static final Map<String, TimeType> STRING_TO_TYPE_MAP =
+ new HashMap<String, TimeType>();
+
+ static {
+ for (TimeType type : TimeType.values()) {
+ STRING_TO_TYPE_MAP.put(type.value, type);
+ }
+ }
+
+ private final String value;
+
+ private TimeType(String value) {
+ this.value = value;
+ }
+
+ public static TimeType of(String tt) {
+ TimeType type = STRING_TO_TYPE_MAP.get(tt);
+ if (type == null) {
+ return TimeType.STANDARD;
+ }
+ return type;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+ }
+
+ public static class MsgAttrProtocolM0 {
+ private final StringBuffer attrBuffer;
+ private String id = null;
+ private String t = null;
+ private TimeType tt = TimeType.NORMAL;
+ private PartitionUnit p = PartitionUnit.HOUR;
+
+ public MsgAttrProtocolM0() {
+ attrBuffer = new StringBuffer();
+ attrBuffer.append("m=0");
+ }
+
+ public MsgAttrProtocolM0 setId(String id) {
+ this.id = id;
+ return this;
+ }
+
+ public MsgAttrProtocolM0 setSpliter(String s) {
+ attrBuffer.append("&s=").append(s);
+ return this;
+ }
+
+ public MsgAttrProtocolM0 setTime(String t) {
+ this.t = t;
+ return this;
+ }
+
+ public MsgAttrProtocolM0 setTime(long t) {
+ return this.setTime(String.valueOf(t));
+ }
+
+ public MsgAttrProtocolM0 setTimeType(String tt) {
+ this.tt = TimeType.of(tt);
+ return this;
+ }
+
+ public MsgAttrProtocolM0 setTimeType(TimeType tt) {
+ this.tt = tt;
+ return this;
+ }
+
+ public MsgAttrProtocolM0 setPartitionUnit(String p) {
+ this.p = PartitionUnit.of(p);
+ return this;
+ }
+
+ public MsgAttrProtocolM0 setPartitionUnit(PartitionUnit p) {
+ this.p = p;
+ return this;
+ }
+
+ public String buildAttr() throws Exception {
+ if (id == null) {
+ throw new Exception("id is null");
+ }
+
+ if (t == null) {
+ throw new Exception("t is null");
+ }
+
+ attrBuffer.append("&iname=").append(id);
+
+ Date d = transData(this.tt, t);
+ String tstr = null;
+ if (this.p == PartitionUnit.DAY) {
+ SimpleDateFormat f = new SimpleDateFormat("yyyyMMdd");
+ tstr = f.format(d);
+ } else if (this.p == PartitionUnit.HOUR) {
+ SimpleDateFormat f = new SimpleDateFormat("yyyyMMddHH");
+ tstr = f.format(d);
+ } else if (this.p == PartitionUnit.QUARTER) {
+ int idx =
+ (int) ((d.getTime() % (60L * 60 * 1000)) / (15L * 60 *
1000));
+ SimpleDateFormat f = new SimpleDateFormat("yyyyMMddHH");
+ tstr = f.format(d) + "q" + idx;
+ }
+
+ return attrBuffer.append("&t=").append(tstr).toString();
+ }
+ }
+
+ public static class MsgAttrProtocolM100 {
+
+ private final StringBuffer attrBuffer;
+ private String id = null;
+ private String t = null;
+
+ private int idp = -1;
+ private int tp = -1;
+ private TimeType tt = null;
+ private PartitionUnit p = null;
+
+ public MsgAttrProtocolM100() {
+ attrBuffer = new StringBuffer();
+ attrBuffer.append("m=100");
+ }
+
+ public MsgAttrProtocolM100 setSpliter(String s) {
+ attrBuffer.append("&s=").append(s);
+ return this;
+ }
+
+ public MsgAttrProtocolM100 setId(String id) {
+ this.id = id;
+ return this;
+ }
+
+ public MsgAttrProtocolM100 setTime(String t) {
+ this.t = t;
+ return this;
+ }
+
+ public MsgAttrProtocolM100 setTime(long t) {
+ return this.setTime(String.valueOf(t));
+ }
+
+ public MsgAttrProtocolM100 setIdPos(int idp) {
+ this.idp = idp;
+ return this;
+ }
+
+ public MsgAttrProtocolM100 setTimePos(int tp) {
+ this.tp = tp;
+ return this;
+ }
+
+ public MsgAttrProtocolM100 setTimeType(String tt) {
+ return this.setTimeType(TimeType.of(tt));
+ }
+
+ public MsgAttrProtocolM100 setTimeType(TimeType tt) {
+ this.tt = tt;
+ return this;
+ }
+
+ public MsgAttrProtocolM100 setPartitionUnit(String p) {
+ return this.setPartitionUnit(PartitionUnit.of(p));
+ }
+
+ public MsgAttrProtocolM100 setPartitionUnit(PartitionUnit p) {
+ this.p = p;
+ return this;
+ }
+
+ public String buildAttr() throws Exception {
+ // #lizard forgives
+ if (id != null) {
+ attrBuffer.append("&iname=").append(id);
+ } else if (idp >= 0) {
+ attrBuffer.append("&idp=").append(idp);
+ }
+ if (t != null) {
+ String tstr = null;
+ if (tt != null && tt == TimeType.NORMAL) {
+ if (makeSureTimeNormal(t)) {
+ tstr = t;
+ }
+ } else {
+ if (this.p == null) {
+ this.p = PartitionUnit.HOUR;
+ }
+ Date d = transData(tt, t);
+ if (this.p == PartitionUnit.DAY) {
+ SimpleDateFormat f = new SimpleDateFormat("yyyyMMdd");
+ tstr = f.format(d);
+ } else if (this.p == PartitionUnit.HOUR) {
+ SimpleDateFormat f = new
SimpleDateFormat("yyyyMMddHH");
+ tstr = f.format(d);
+ } else if (this.p == PartitionUnit.HALFHOUR) {
+ int idx =
+ (int) ((d.getTime() % (60L * 60 * 1000)) /
(30L * 60 * 1000));
+ SimpleDateFormat f = new
SimpleDateFormat("yyyyMMddHH");
+ tstr = f.format(d) + "n" + idx;
+ } else if (this.p == PartitionUnit.QUARTER) {
+ int idx =
+ (int) ((d.getTime() % (60L * 60 * 1000)) /
(15L * 60 * 1000));
+ SimpleDateFormat f = new
SimpleDateFormat("yyyyMMddHH");
+ tstr = f.format(d) + "q" + idx;
+ } else if (this.p == PartitionUnit.TENMINS) {
+ int idx =
+ (int) ((d.getTime() % (60L * 60 * 1000)) /
(10L * 60 * 1000));
+ SimpleDateFormat f = new
SimpleDateFormat("yyyyMMddHH");
+ tstr = f.format(d) + "t" + idx;
+ } else if (this.p == PartitionUnit.FIVEMINS) {
+ int idx =
+ (int) ((d.getTime() % (60L * 60 * 1000)) / (5L
* 60 * 1000));
+ SimpleDateFormat f = new
SimpleDateFormat("yyyyMMddHH");
+ tstr = f.format(d) + "f" + idx;
+ }
+ }
+
+ if (tstr != null) {
+ attrBuffer.append("&t=").append(tstr);
+ }
+
+ } else if (tp >= 0) {
+ attrBuffer.append("&tp=").append(tp);
+ if (this.tt != null) {
+ attrBuffer.append("&tt=").append(tt);
+ }
+ if (this.p != null) {
+ attrBuffer.append("&p=").append(p.toString());
+ }
+ }
+ return attrBuffer.toString();
+ }
+
+ private boolean makeSureTimeNormal(String time) {
+ int len = time.length();
+ return len == 8 || len == 10
+ || (len == 12 && time.charAt(10) == 'p');
+ }
+
+ }
+
+ public static MsgAttrProtocolM0 getProtololM0() {
+ return new MsgAttrProtocolM0();
+ }
+
+ public static MsgAttrProtocolM100 getProtololM100() {
+ return new MsgAttrProtocolM100();
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ SimpleDateFormat f =
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ SimpleDateFormat f1 =
+ new SimpleDateFormat("yyyyMMddHH");
+
+ System.out.println(TDMsgAttrBuilder.getProtololM0()
+ .setId("interfaceid").setTimeType(TimeType.S)
+ .setTime(String.valueOf(System.currentTimeMillis() / 1000))
+ .buildAttr());
+ System.out.println(TDMsgAttrBuilder.getProtololM0()
+ .setId("interfaceid").setTime(System.currentTimeMillis())
+ .setTimeType(TimeType.MS).buildAttr());
+ System.out.println(TDMsgAttrBuilder.getProtololM0()
+ .setId("interfaceid")
+ .setTime(f1.format(new Date(System.currentTimeMillis())))
+ .buildAttr());
+ System.out.println(TDMsgAttrBuilder.getProtololM0()
+ .setId("interfaceid")
+ .setTime(f.format(new Date(System.currentTimeMillis())))
+ .setTimeType(TimeType.STANDARD).buildAttr());
+ System.out.println(TDMsgAttrBuilder.getProtololM0()
+ .setId("interfaceid")
+ .setTime(f1.format(new Date(System.currentTimeMillis())))
+ .setTimeType(TimeType.NORMAL).buildAttr());
+ System.out.println(TDMsgAttrBuilder.getProtololM0()
+ .setId("interfaceid").setTimeType(TimeType.S)
+ .setTime(String.valueOf(System.currentTimeMillis() / 1000))
+ .setPartitionUnit(PartitionUnit.DAY).buildAttr());
+ System.out.println(TDMsgAttrBuilder.getProtololM0()
+ .setId("interfaceid").setTimeType(TimeType.S)
+ .setTime(String.valueOf(System.currentTimeMillis() / 1000))
+ .setPartitionUnit(PartitionUnit.HOUR).buildAttr());
+ System.out.println(TDMsgAttrBuilder.getProtololM0()
+ .setId("interfaceid").setTimeType(TimeType.S)
+ .setTime(String.valueOf(System.currentTimeMillis() / 1000))
+ .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
+ System.out.println();
+
+ System.out.print(TDMsgAttrBuilder.getProtololM100().buildAttr());
+ System.out.println("\t\t\t\t\t\t// ---- all the param is "
+ + "default : s=\\t, idp=0, tp=1, tt=#ms, p=h ");
+
+ System.out.print(TDMsgAttrBuilder.getProtololM100().setSpliter(",")
+ .buildAttr());
+ System.out.println("\t\t\t\t\t// ---- : idp=0, tp=1, tt=#ms, p=h ");
+
+ System.out.print(TDMsgAttrBuilder.getProtololM100().setSpliter(",")
+ .setIdPos(0).buildAttr());
+ System.out.println("\t\t\t\t\t// ---- : tp=1, tt=#ms, p=h ");
+
+ System.out.print(TDMsgAttrBuilder.getProtololM100().setSpliter(",")
+ .setTimePos(1).buildAttr());
+ System.out.println("\t\t\t\t\t// ---- : idp=0, tt=#ms, p=h ");
+
+ System.out.print(TDMsgAttrBuilder.getProtololM100().setSpliter(",")
+ .setIdPos(0).setTimePos(1).buildAttr());
+ System.out.println("\t\t\t\t// ---- : tt=#ms, p=h ");
+
+ System.out.print(TDMsgAttrBuilder.getProtololM100().setSpliter(",")
+
.setIdPos(0).setTimePos(1).setTimeType(TimeType.S).buildAttr());
+ System.out.println("\t\t\t// ---- : p=h ");
+
+ System.out.print(TDMsgAttrBuilder.getProtololM100().setSpliter(",")
+ .setIdPos(0).setTimePos(1)
+ .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
+ System.out.println("\t\t\t// ---- : tt=#s ");
+
+ System.out.print(TDMsgAttrBuilder.getProtololM100().setSpliter(",")
+ .setIdPos(0).setTimePos(1).setTimeType(TimeType.MS)
+ .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
+ System.out.println("\t\t\t// ---- : all ");
+
+ System.out.print(TDMsgAttrBuilder.getProtololM100()
+ .setId("interfaceid").setSpliter(",").setIdPos(0).setTimePos(1)
+ .setTimeType(TimeType.MS)
+ .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
+ System.out.println("\t// ---- : id is set so idp is ignored ");
+
+ System.out.print(TDMsgAttrBuilder.getProtololM100().setSpliter(",")
+ .setIdPos(0).setTimePos(1).setTime(System.currentTimeMillis())
+ .setTimeType(TimeType.MS)
+ .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
+ System.out.println("\t\t\t// ---- : t is set so tp is ignored ");
+
+ System.out.print(TDMsgAttrBuilder.getProtololM100()
+ .setId("interfaceid").setSpliter(",").setIdPos(0).setTimePos(1)
+ .setTime(System.currentTimeMillis()).setTimeType(TimeType.MS)
+ .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
+ System.out
+ .println("\t\t// ---- : id and t are all set so idpos and tp
are all ignored ");
+
+ System.out.print(TDMsgAttrBuilder.getProtololM100()
+ .setId("interfaceid").setSpliter(",").setIdPos(0).setTimePos(1)
+ .setTime(f1.format(new Date(System.currentTimeMillis())))
+ .setTimeType(TimeType.NORMAL)
+ .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
+ System.out.println("\t\t// ---- : TimeType.NORMAL ");
+
+ System.out
+ .println("\nAttention !!!! m=0 is contained by m=100, so just
use m=100");
+
+ // long time = System.currentTimeMillis();
+ // byte[] data0 = ("id," + time + ",other,data").getBytes();
+ // String attr = TDMsgProtocolFactory.getProtololM100().setSpliter(",")
+ // .setIdPos(0).setTimePos(1).setTimeType(TimeType.MS)
+ // .setPartitionUnit(PartitionUnit.QUARTER).buildAttr();
+ // TDMsg1 tdmsg = TDMsg1.newTDMsg();
+ // tdmsg.addMsg(attr, data0);
+ // byte[] result = tdmsg.buildArray();
+ // System.out.println(new String(result));
+ // attr = TDMsgProtocolFactory.getProtololM0().setSpliter(",")
+ // .setTime(System.currentTimeMillis()).setTimeType(TimeType.MS)
+ // .setPartitionUnit(PartitionUnit.QUARTER).buildAttr();
+ // tdmsg = TDMsg1.newTDMsg();
+ // tdmsg.addMsg(attr, data0);
+ // result = tdmsg.buildArray();
+ // System.out.println(new String(result));
+ System.out.print(TDMsgAttrBuilder.getProtololM100().setSpliter(",")
+ .setIdPos(0).setTimePos(1)
+
+ .setTimeType(TimeType.STANDARD)
+ .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
+
+ }
+
+ private static Date transData(TimeType tt, String timeStr) throws
Exception {
+ Date d = null;
+ if (tt == TimeType.MS) {
+ d = new Date(Long.valueOf(timeStr));
+ } else if (tt == TimeType.S) {
+ d = new Date(Long.valueOf(timeStr) * 1000);
+ } else if (tt == TimeType.STANDARD) {
+ SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ d = f.parse(timeStr);
+ } else if (tt == TimeType.NORMAL) {
+ SimpleDateFormat f = new SimpleDateFormat("yyyyMMddHH");
+ d = f.parse(timeStr);
+ }
+ return d;
+ }
+}
diff --git a/pom.xml b/pom.xml
index 362adf6..7d26759 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,7 @@
</mailingLists>
<modules>
+ <module>inlong-common</module>
<module>inlong-tubemq</module>
</modules>