This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-613
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-613 by this push:
     new eca5788  [INLONG-641] Add inlong-common module (#486)
eca5788 is described below

commit eca5788a5ce94fed7236982223989da189dfde8e
Author: gosonzhang <[email protected]>
AuthorDate: Wed Jun 30 16:56:36 2021 +0800

    [INLONG-641] Add inlong-common module (#486)
    
    Co-authored-by: gosonzhang <[email protected]>
---
 inlong-common/pom.xml                              |  148 +++
 .../inlong/commons/msg/AttributeConstants.java     |   74 ++
 .../apache/inlong/commons/msg/DataInputBuffer.java |  116 ++
 .../inlong/commons/msg/DataOutputBuffer.java       |  133 +++
 .../java/org/apache/inlong/commons/msg/TDMsg1.java | 1108 ++++++++++++++++++++
 .../inlong/commons/msg/TDMsgAttrBuilder.java       |  457 ++++++++
 pom.xml                                            |    1 +
 7 files changed, 2037 insertions(+)

diff --git a/inlong-common/pom.xml b/inlong-common/pom.xml
new file mode 100644
index 0000000..265f3b4
--- /dev/null
+++ b/inlong-common/pom.xml
@@ -0,0 +1,148 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>inlong</artifactId>
+        <version>0.9.0-incubating-SNAPSHOT</version>
+    </parent>
+    <packaging>jar</packaging>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>inlong-common</artifactId>
+    <name>Apache InLong - Common</name>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+            <version>1.1.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.6.4</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.4</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>11.0.2</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <plugins>
+            <!-- bind the maven-assembly-plugin to the package phase this will 
create
+                a jar file without the storm dependencies suitable for 
deployment to a cluster. -->
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                    <archive>
+                        <manifest>
+                            <mainClass></mainClass>
+                        </manifest>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-source-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+        <pluginManagement>
+            <plugins>
+                <!--This plugin's configuration is used to store Eclipse m2e 
settings
+                    only. It has no influence on the Maven build itself. -->
+                <plugin>
+                    <groupId>org.eclipse.m2e</groupId>
+                    <artifactId>lifecycle-mapping</artifactId>
+                    <version>1.0.0</version>
+                    <configuration>
+                        <lifecycleMappingMetadata>
+                            <pluginExecutions>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>
+                                            com.theoryinpractise
+                                        </groupId>
+                                        <artifactId>
+                                            clojure-maven-plugin
+                                        </artifactId>
+                                        <versionRange>
+                                            [1.3.8,)
+                                        </versionRange>
+                                        <goals>
+                                            <goal>compile</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore></ignore>
+                                    </action>
+                                </pluginExecution>
+                            </pluginExecutions>
+                        </lifecycleMappingMetadata>
+                    </configuration>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+</project>
\ No newline at end of file
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/commons/msg/AttributeConstants.java
 
b/inlong-common/src/main/java/org/apache/inlong/commons/msg/AttributeConstants.java
new file mode 100644
index 0000000..857d7f0
--- /dev/null
+++ 
b/inlong-common/src/main/java/org/apache/inlong/commons/msg/AttributeConstants.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.msg;
+
+public interface AttributeConstants {
+
+    String SEPARATOR = "&";
+    String KEY_VALUE_SEPARATOR = "=";
+
+    /**
+     * business id
+     * unique string id for each business or product
+     */
+    String BUSINESS_ID = "bid";
+
+    /**
+     * interface id
+     * unique string id for each interface of business
+     * An interface stand for a kind of data
+     */
+    String INTERFACE_ID = "tid";
+
+    /**
+     * iname is like a tid but used in file protocol(m=xxx)
+     */
+    String INAME = "iname";
+
+    /**
+     * data time
+     */
+    String DATA_TIME = "dt";
+
+    String TIME_STAMP = "t";
+
+    /* compress type */
+    String COMPRESS_TYPE = "cp";
+
+    /* count value for how many records a message body contains */
+    String MESSAGE_COUNT = "cnt";
+
+    /* message type */
+    String MESSAGE_TYPE = "mt";
+
+    /* sort type */
+    String METHOD = "m";
+
+    /* global unique id for a message*/
+    String SEQUENCE_ID = "sid";
+
+    String UNIQ_ID = "uniq";
+
+    /* from where */
+    String FROM = "f";
+
+    String RCV_TIME = "rt";
+
+    String NODE_IP = "NodeIP";
+
+}
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/commons/msg/DataInputBuffer.java
 
b/inlong-common/src/main/java/org/apache/inlong/commons/msg/DataInputBuffer.java
new file mode 100644
index 0000000..352b8a8
--- /dev/null
+++ 
b/inlong-common/src/main/java/org/apache/inlong/commons/msg/DataInputBuffer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.msg;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+
+/**
+ * A reusable {@link DataInput} implementation that reads from an in-memory
+ * buffer.
+ *
+ * <p>This saves memory over creating a new DataInputStream and
+ * ByteArrayInputStream each time data is read.
+ *
+ * <p>Typical usage is something like the following:<pre>
+ *
+ * DataInputBuffer buffer = new DataInputBuffer();
+ * while (... loop condition ...) {
+ *   byte[] data = ... get data ...;
+ *   int dataLength = ... get data length ...;
+ *   buffer.reset(data, dataLength);
+ *   ... read buffer using DataInput methods ...
+ * }
+ * </pre>
+ */
+public class DataInputBuffer extends DataInputStream {
+    private static class Buffer extends ByteArrayInputStream {
+        public Buffer() {
+            super(new byte[]{
+                    //
+            });
+        }
+
+        public void reset(byte[] input, int start, int length) {
+            this.buf = input;
+            this.count = start + length;
+            this.mark = start;
+            this.pos = start;
+        }
+
+        public byte[] getData() {
+            return buf;
+        }
+
+        public int getPosition() {
+            return pos;
+        }
+
+        public int getLength() {
+            return count;
+        }
+    }
+
+    private Buffer buffer;
+
+    /**
+     * Constructs a new empty buffer.
+     */
+    public DataInputBuffer() {
+        this(new Buffer());
+    }
+
+    private DataInputBuffer(Buffer buffer) {
+        super(buffer);
+        this.buffer = buffer;
+    }
+
+    /**
+     * Resets the data that the buffer reads.
+     */
+    public void reset(byte[] input, int length) {
+        buffer.reset(input, 0, length);
+    }
+
+    /**
+     * Resets the data that the buffer reads.
+     */
+    public void reset(byte[] input, int start, int length) {
+        buffer.reset(input, start, length);
+    }
+
+    public byte[] getData() {
+        return buffer.getData();
+    }
+
+    /**
+     * Returns the current position in the input.
+     */
+    public int getPosition() {
+        return buffer.getPosition();
+    }
+
+    /**
+     * Returns the length of the input.
+     */
+    public int getLength() {
+        return buffer.getLength();
+    }
+
+}
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/commons/msg/DataOutputBuffer.java
 
b/inlong-common/src/main/java/org/apache/inlong/commons/msg/DataOutputBuffer.java
new file mode 100644
index 0000000..207800f
--- /dev/null
+++ 
b/inlong-common/src/main/java/org/apache/inlong/commons/msg/DataOutputBuffer.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.msg;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A reusable {@link DataOutput} implementation that writes to an in-memory
+ * buffer.
+ *
+ * <p>This saves memory over creating a new DataOutputStream and
+ * ByteArrayOutputStream each time data is written.
+ *
+ * <p>Typical usage is something like the following:<pre>
+ *
+ * DataOutputBuffer buffer = new DataOutputBuffer();
+ * while (... loop condition ...) {
+ *   buffer.reset();
+ *   ... write buffer using DataOutput methods ...
+ *   byte[] data = buffer.getData();
+ *   int dataLength = buffer.getLength();
+ *   ... write data to its ultimate destination ...
+ * }
+ * </pre>
+ */
+public class DataOutputBuffer extends DataOutputStream {
+
+    private static class Buffer extends ByteArrayOutputStream {
+
+        public byte[] getData() {
+            return buf;
+        }
+
+        public int getLength() {
+            return count;
+        }
+
+        public Buffer() {
+            super();
+        }
+
+        public Buffer(int size) {
+            super(size);
+        }
+
+        public void write(DataInput in, int len) throws IOException {
+            int newcount = count + len;
+            if (newcount > buf.length) {
+                byte[] newbuf = new byte[Math.max(buf.length << 1, newcount)];
+                System.arraycopy(buf, 0, newbuf, 0, count);
+                buf = newbuf;
+            }
+            in.readFully(buf, count, len);
+            count = newcount;
+        }
+    }
+
+    private Buffer buffer;
+
+    /**
+     * Constructs a new empty buffer.
+     */
+    public DataOutputBuffer() {
+        this(new Buffer());
+    }
+
+    public DataOutputBuffer(int size) {
+        this(new Buffer(size));
+    }
+
+    private DataOutputBuffer(Buffer buffer) {
+        super(buffer);
+        this.buffer = buffer;
+    }
+
+    /**
+     * Returns the current contents of the buffer.
+     * Data is only valid to {@link #getLength()}.
+     */
+    public byte[] getData() {
+        return buffer.getData();
+    }
+
+    /**
+     * Returns the length of the valid data currently in the buffer.
+     */
+    public int getLength() {
+        return buffer.getLength();
+    }
+
+    /**
+     * Resets the buffer to empty.
+     */
+    public DataOutputBuffer reset() {
+        this.written = 0;
+        buffer.reset();
+        return this;
+    }
+
+    /**
+     * Writes bytes from a DataInput directly into the buffer.
+     */
+    public void write(DataInput in, int length) throws IOException {
+        buffer.write(in, length);
+    }
+
+    /**
+     * Write to a file stream
+     */
+    public void writeTo(OutputStream out) throws IOException {
+        buffer.writeTo(out);
+    }
+}
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java 
b/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java
new file mode 100644
index 0000000..8919f99
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsg1.java
@@ -0,0 +1,1108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.msg;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.xerial.snappy.Snappy;
+
+public class TDMsg1 {
+    private static final int DEFAULT_CAPACITY = 4096;
+    private final int capacity;
+
+    private static final int BIN_MSG_NO_ZIP = 0;
+    private static final int BIN_MSG_SNAPPY_TYPE = 1;
+
+    private static final int BIN_MSG_TOTALLEN_OFFSET = 0;
+    private static final int BIN_MSG_BID_OFFSET = 5;
+    private static final int BIN_MSG_TID_OFFSET = 7;
+    private static final int BIN_MSG_EXTFIELD_OFFSET = 9;
+    private static final int BIN_MSG_COUNT_OFFSET = 15;
+    private static final int BIN_MSG_DATATIME_OFFSET = 11;
+    private static final int BIN_MSG_TOTALLEN_SIZE = 4;
+    private static final int BIN_MSG_MSGTYPE_OFFSET = 4;
+    private static final int BIN_MSG_SET_SNAPPY = (1 << 5);
+    private static final int BIN_MSG_BODYLEN_SIZE = 4;
+    private static final int BIN_MSG_BODYLEN_OFFSET = 21;
+    private static final int BIN_MSG_BODY_OFFSET =
+            BIN_MSG_BODYLEN_SIZE + BIN_MSG_BODYLEN_OFFSET;
+    private static final int BIN_MSG_ATTRLEN_SIZE = 2;
+    private static final int BIN_MSG_FORMAT_SIZE = 29;
+    private static final int BIN_MSG_MAGIC_SIZE = 2;
+    private static final int BIN_MSG_MAGIC = 0xEE01;
+
+    private static final byte[] MAGIC0 = {(byte) 0xf, (byte) 0x0};
+    // with timestamp
+    private static final byte[] MAGIC1 = {(byte) 0xf, (byte) 0x1};
+    // with msg cnt 20130619
+    private static final byte[] MAGIC2 = {(byte) 0xf, (byte) 0x2};
+    // support msg_type = 6
+    private static final byte[] MAGIC3 = {(byte) 0xf, (byte) 0x3};
+    // support binmsg
+    private static final byte[] MAGIC4 = {(byte) 0xf, (byte) 0x4};
+
+    private final boolean addmode;
+
+    private static final Joiner.MapJoiner MAP_JOINER =
+            Joiner.on(AttributeConstants.SEPARATOR)
+                    
.withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
+    private static final Splitter.MapSplitter MAP_SPLITTER =
+            Splitter.on(AttributeConstants.SEPARATOR)
+                    
.trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
+
+    static class DataBuffer {
+
+        DataOutputBuffer out;
+        int cnt;
+
+        public DataBuffer() {
+            out = new DataOutputBuffer();
+        }
+
+        public void write(byte[] array, int position, int len)
+                throws IOException {
+            cnt++;
+            out.writeInt(len);
+            out.write(array, position, len);
+        }
+    }
+
+    private LinkedHashMap<String, DataBuffer> attr2MsgBuffer;
+    private ByteBuffer binMsgBuffer;
+    private int datalen = 0;
+    private int msgcnt = 0;
+    private boolean compress;
+    private boolean isNumBid = false;
+    private boolean ischeck = true;
+
+    private final Version version;
+    private long timeoffset = 0;
+
+    public void setTimeoffset(long offset) {
+        this.timeoffset = offset;
+    }
+
+    private enum Version {
+        vn(-1), v0(0), v1(1),
+        v2(2), v3(3), v4(4);
+
+        private static final Map<Integer, Version> INT_TO_TYPE_MAP =
+                new HashMap<Integer, Version>();
+
+        static {
+            for (Version type : Version.values()) {
+                INT_TO_TYPE_MAP.put(type.value, type);
+            }
+        }
+
+        private final int value;
+
+        private Version(int value) {
+            this.value = value;
+        }
+
+        public int intValue() {
+            return value;
+        }
+
+        public static Version of(int v) {
+            if (!INT_TO_TYPE_MAP.containsKey(v)) {
+                return vn;
+            }
+            return INT_TO_TYPE_MAP.get(v);
+        }
+
+    }
+
+    /**
+     * capacity: 4096, compress: true, version: 1
+     *
+     * @return
+     */
+    public static TDMsg1 newTDMsg() {
+        return newTDMsg(true);
+    }
+
+    /**
+     * capacity: 4096, version: 1
+     *
+     * @param compress if copress
+     * @return TDMsg1
+     */
+    public static TDMsg1 newTDMsg(boolean compress) {
+        return newTDMsg(DEFAULT_CAPACITY, compress);
+    }
+
+    /**
+     * capacity: 4096, compress: true
+     *
+     * @param v version info
+     * @return TDMsg1
+     */
+    public static TDMsg1 newTDMsg(int v) {
+        return newTDMsg(DEFAULT_CAPACITY, true, v);
+    }
+
+    /**
+     * capacity: 4096
+     *
+     * @param compress if compress
+     * @param v        version
+     * @return TDMsg1
+     */
+    public static TDMsg1 newTDMsg(boolean compress, int v) {
+        return newTDMsg(DEFAULT_CAPACITY, compress, v);
+    }
+
+    /**
+     * version: 1
+     *
+     * @param capacity data capacity
+     * @param compress if compress
+     * @return TDMsg1
+     */
+    public static TDMsg1 newTDMsg(int capacity, boolean compress) {
+        return new TDMsg1(capacity, compress, Version.v1);
+    }
+
+    /**
+     * @param capacity data capacity
+     * @param compress compress
+     * @param v        version
+     * @return TDMsg1
+     */
+    public static TDMsg1 newTDMsg(int capacity, boolean compress, int v) {
+        return new TDMsg1(capacity, compress, Version.of(v));
+    }
+
+    // for create
+    private TDMsg1(int capacity, boolean compress, Version v) {
+        version = v;
+        addmode = true;
+        this.compress = compress;
+        this.capacity = capacity;
+        attr2MsgBuffer = new LinkedHashMap<String, DataBuffer>();
+        parsedInput = null;
+        reset();
+    }
+
+    /**
+     * return false means current msg is big enough, no other data should be
+     * added again, but attention: the input data has already been added, and 
if
+     * you add another data after return false it can also be added
+     * successfully.
+     *
+     * @param attr   attribute info
+     * @param data   binary data
+     * @param offset data start offset
+     * @param len    data length
+     * @return boolean
+     */
+    public boolean addMsg(String attr, byte[] data, int offset, int len) {
+        return addMsg(attr, ByteBuffer.wrap(data, offset, len));
+    }
+
+    public boolean addMsg(String attr, ByteBuffer data) {
+        checkMode(true);
+
+        if ((version.intValue() == Version.v3.intValue())
+                && !checkData(data)) {
+            return false;
+        }
+
+        DataBuffer outputBuffer = attr2MsgBuffer.get(attr);
+        if (outputBuffer == null) {
+            outputBuffer = new DataBuffer();
+            attr2MsgBuffer.put(attr, outputBuffer);
+            // attrlen + utflen + meglen + compress
+            this.datalen += attr.length() + 2 + 4 + 1;
+        }
+
+        int len = data.remaining();
+        try {
+            outputBuffer.write(data.array(), data.position(), len);
+            this.datalen += len + 4;
+            if (version.intValue() == Version.v2.intValue()) {
+                this.datalen += 4;
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+            return false;
+        }
+        msgcnt++;
+        return checkLen(attr, len);
+    }
+
+    public boolean addMsg(String attr, byte[] data) {
+        return addMsg(attr, ByteBuffer.wrap(data));
+    }
+
+    /* 新增支持新数据消息 */
+    public boolean addMsg(byte[] data) {
+        return addMsg(ByteBuffer.wrap(data));
+    }
+
+    public boolean addMsg(ByteBuffer data) {
+        if (!checkBinData(data)) {
+            return false;
+        }
+
+        if (binMsgBuffer != null) {
+            return false;
+        }
+
+        binMsgBuffer = ByteBuffer.allocate(data.remaining());
+        binMsgBuffer.put(data);
+
+        binMsgBuffer.position(BIN_MSG_TOTALLEN_OFFSET);
+        msgcnt = getBinMsgCnt(binMsgBuffer);
+        return true;
+    }
+
+    private int getBinMsgtype(ByteBuffer data) {
+        return data.get(BIN_MSG_MSGTYPE_OFFSET);
+    }
+
+    private int getBinMsgCnt(ByteBuffer data) {
+        return data.getShort(BIN_MSG_COUNT_OFFSET);
+    }
+
+    private long getBinCreatetime(ByteBuffer data) {
+        return data.getInt(BIN_MSG_DATATIME_OFFSET) * 1000L;
+    }
+
+    private boolean getBinNumFlag(ByteBuffer data) {
+        return (data.getShort(BIN_MSG_EXTFIELD_OFFSET) & 0x4) == 0;
+    }
+
+    private boolean checkBinData(ByteBuffer data) {
+        // 检查消息合法性
+        int totalLen = data.getInt(BIN_MSG_TOTALLEN_OFFSET);
+        int bodyLen = data.getInt(BIN_MSG_BODYLEN_OFFSET);
+        int attrLen = data.getShort(BIN_MSG_BODY_OFFSET + bodyLen);
+        int msgMagic = (data.getShort(BIN_MSG_BODY_OFFSET + bodyLen
+                + BIN_MSG_ATTRLEN_SIZE + attrLen) & 0xFFFF);
+
+        if ((totalLen + BIN_MSG_TOTALLEN_SIZE != (bodyLen + attrLen + 
BIN_MSG_FORMAT_SIZE))
+                || (msgMagic != BIN_MSG_MAGIC)) {
+            return false;
+        }
+
+        return true;
+    }
+
+    public boolean addMsgs(String attr, ByteBuffer data) {
+        boolean res = true;
+        Iterator<ByteBuffer> it = getIteratorBuffer(data);
+        setCheckMode(false);
+        while (it.hasNext()) {
+            res = this.addMsg(attr, it.next());
+        }
+        setCheckMode(true);
+        return res;
+    }
+
+    private void setCheckMode(boolean mode) {
+        if (version.intValue() == Version.v3.intValue()) {
+            ischeck = mode;
+        }
+    }
+
+    // Version 3 message, need check data content
+    private boolean checkData(ByteBuffer data) {
+        if ((version.intValue() == Version.v3.intValue()) && !ischeck) {
+            return true;
+        }
+
+        // check data
+        data.mark();
+        int msgnum = 0;
+        while (data.remaining() > 0) {
+            int datalen = data.getInt();
+            if (datalen > data.remaining()) {
+                return false;
+            }
+            msgnum++;
+            byte[] record = new byte[datalen];
+            data.get(record, 0, datalen);
+        }
+
+        msgnum = msgnum / 2;
+        if (msgnum > 1) {
+            msgcnt += msgnum - 1;
+        }
+        data.reset();
+        return true;
+    }
+
+    private boolean checkLen(String attr, int len) {
+        return datalen < capacity;
+    }
+
+    public boolean isfull() {
+        checkMode(true);
+        if (datalen >= capacity) {
+            return true;
+        }
+        return false;
+    }
+
+    private ByteBuffer defaultBuild(long createtime) {
+        try {
+            this.createtime = createtime;
+            DataOutputBuffer out = new DataOutputBuffer(capacity);
+
+            writeHeader(out);
+            out.writeInt(attr2MsgBuffer.size());
+
+            if (compress) {
+                for (String attr : attr2MsgBuffer.keySet()) {
+                    out.writeUTF(attr);
+                    DataBuffer data = attr2MsgBuffer.get(attr);
+                    if (version.intValue() == Version.v2.intValue()) {
+                        out.writeInt(data.cnt);
+                    }
+                    int guessLen =
+                            Snappy.maxCompressedLength(data.out.getLength());
+                    byte[] tmpData = new byte[guessLen];
+                    int len = Snappy.compress(data.out.getData(), 0,
+                            data.out.getLength(), tmpData, 0);
+                    out.writeInt(len + 1);
+                    out.writeBoolean(compress);
+                    out.write(tmpData, 0, len);
+                }
+            } else {
+                for (String attr : attr2MsgBuffer.keySet()) {
+                    out.writeUTF(attr);
+                    DataBuffer data = attr2MsgBuffer.get(attr);
+                    if (version.intValue() == Version.v2.intValue()) {
+                        out.writeInt(data.cnt);
+                    }
+                    out.writeInt(data.out.getLength() + 1);
+                    out.writeBoolean(compress);
+                    out.write(data.out.getData(), 0, data.out.getLength());
+                }
+            }
+            writeMagic(out);
+            out.close();
+            return ByteBuffer.wrap(out.getData(), 0, out.getLength());
+        } catch (IOException e) {
+            e.printStackTrace();
+            return null;
+        }
+    }
+
+    private ByteBuffer binBuild(long createtime) {
+        try {
+            this.createtime = createtime;
+            DataOutputBuffer out = new DataOutputBuffer(capacity);
+
+            writeMagic(out);
+
+            int msgType = getBinMsgtype(binMsgBuffer);
+            int compressType = ((msgType & 0xE0) >> 5);
+            if ((compressType == 0) && (compress)) {
+
+                binMsgBuffer.position(BIN_MSG_BODYLEN_OFFSET);
+                // copy body data
+                int bodyLen = binMsgBuffer.getInt();
+                byte[] body = new byte[bodyLen];
+                binMsgBuffer.get(body, 0, bodyLen);
+
+                // copy attributes
+                int attrLen =
+                        binMsgBuffer.getShort(BIN_MSG_BODY_OFFSET + bodyLen);
+                byte[] attr =
+                        new byte[BIN_MSG_ATTRLEN_SIZE + attrLen + 
BIN_MSG_MAGIC_SIZE];
+                binMsgBuffer.get(attr, 0, attr.length);
+
+                int guessLen = Snappy.maxCompressedLength(bodyLen);
+                byte[] tmpData = new byte[guessLen];
+                int realLen = Snappy.compress(body, 0,
+                        body.length, tmpData, 0);
+
+                int totalDataLen = 
binMsgBuffer.getInt(BIN_MSG_TOTALLEN_OFFSET);
+                ByteBuffer dataBuf = ByteBuffer.allocate(
+                        totalDataLen + BIN_MSG_TOTALLEN_SIZE - body.length + 
realLen);
+
+                // copy headers
+                dataBuf.put(binMsgBuffer.array(), 0, BIN_MSG_BODYLEN_OFFSET);
+                // set compress flag
+                dataBuf.put(BIN_MSG_MSGTYPE_OFFSET, (byte) (msgType | 
BIN_MSG_SET_SNAPPY));
+                dataBuf.putInt(BIN_MSG_TOTALLEN_OFFSET,
+                        realLen + attrLen + BIN_MSG_FORMAT_SIZE - 4);
+                // set data length
+                dataBuf.putInt(BIN_MSG_BODYLEN_OFFSET, realLen);
+                // fill compressed data
+                System.arraycopy(tmpData, 0,
+                        dataBuf.array(), BIN_MSG_BODY_OFFSET, realLen);
+                // fill attributes and MAGIC
+                System.arraycopy(attr, 0, dataBuf.array(),
+                        BIN_MSG_BODY_OFFSET + realLen, attr.length);
+
+                out.write(dataBuf.array(), 0, dataBuf.capacity());
+            } else {
+                out.write(binMsgBuffer.array(), 0, binMsgBuffer.capacity());
+            }
+
+            writeMagic(out);
+            out.close();
+            return ByteBuffer.wrap(out.getData(), 0, out.getLength());
+        } catch (IOException e) {
+            e.printStackTrace();
+            return null;
+        }
+    }
+
+    public ByteBuffer build() {
+        return build(System.currentTimeMillis() + timeoffset);
+    }
+
+    public ByteBuffer build(long createtime) {
+        checkMode(true);
+        if (version.intValue() != Version.v4.intValue()) {
+            return defaultBuild(createtime);
+        } else {
+            return binBuild(createtime);
+        }
+    }
+
+    private void writeHeader(DataOutputBuffer out) throws IOException {
+        writeMagic(out);
+        if (version.intValue() == Version.v4.intValue()) {
+            return;
+        }
+
+        if (version.intValue() >= Version.v1.intValue()) {
+            // createtime = System.currentTimeMillis() + timeoffset;
+            out.writeLong(createtime);
+        }
+        if (version.intValue() >= Version.v2.intValue()) {
+            out.writeInt(this.getMsgCnt());
+        }
+    }
+
+    private void writeMagic(DataOutputBuffer out) throws IOException {
+        if (version == Version.v1) {
+            out.write(MAGIC1[0]);
+            out.write(MAGIC1[1]);
+        } else if (version == Version.v2) {
+            out.write(MAGIC2[0]);
+            out.write(MAGIC2[1]);
+        } else if (version == Version.v3) {
+            out.write(MAGIC3[0]);
+            out.write(MAGIC3[1]);
+        } else if (version == Version.v4) {
+            out.write(MAGIC4[0]);
+            out.write(MAGIC4[1]);
+        } else {
+            throw new IOException("wrong version : " + version.intValue());
+        }
+    }
+
+    public byte[] buildArray() {
+        return buildArray(System.currentTimeMillis() + timeoffset);
+    }
+
+    public byte[] buildArray(long createtime) {
+        ByteBuffer buffer = this.build(createtime);
+        if (buffer == null) {
+            return null;
+        }
+        byte[] res = new byte[buffer.remaining()];
+        System.arraycopy(buffer.array(), buffer.position(), res, 0, 
res.length);
+        return res;
+    }
+
+    public void reset() {
+        checkMode(true);
+        this.attr2MsgBuffer.clear();
+        this.datalen = getHeaderLen();
+        msgcnt = 0;
+    }
+
+    private int getHeaderLen() {
+        int len = 4; // magic
+        if (version.intValue() >= Version.v1.intValue()) {
+            len += 8; // create time
+        }
+        if (version.intValue() == Version.v2.intValue()) {
+            len += 4; // msgcnt
+        }
+
+        return len + 4; // attrcnt
+    }
+
+    // for both mode
+    public int getMsgCnt() {
+        return msgcnt;
+    }
+
+    public int getMsgCnt(String attr) {
+        if (addmode) {
+            return this.attr2MsgBuffer.get(attr).cnt;
+        } else {
+            return this.attr2Rawdata.get(attr).cnt;
+        }
+    }
+
+    private void checkMode(boolean add) {
+        if (addmode != add) {
+            throw new RuntimeException(
+                    addmode ? "illegal operation in add mode !!!"
+                            : "illegal operation in parse mode !!!");
+        }
+    }
+
+    private int attrcnt = -1;
+
+    // private LinkedHashMap<String, ByteBuffer> attr2Rawdata = null;
+    static class DataByteBuffer {
+        final int cnt;
+        ByteBuffer buffer;
+        DataOutputBuffer inoutBuffer;
+
+        public DataByteBuffer(int cnt, ByteBuffer buffer) {
+            this.cnt = cnt;
+            this.buffer = buffer;
+        }
+
+        public DataByteBuffer(int cnt, DataOutputBuffer inoutbuffer) {
+            this.cnt = cnt;
+            this.inoutBuffer = inoutbuffer;
+        }
+
+        public void syncByteBuffer() {
+            this.buffer = ByteBuffer.wrap(inoutBuffer.getData(), 0, 
inoutBuffer.getLength());
+        }
+    }
+
+    private LinkedHashMap<String, DataByteBuffer> attr2Rawdata = null;
+
+    // not used right now
+    // private LinkedHashMap<String, Integer> attr2index = null;
+    private long createtime = -1;
+    private boolean parsed = false;
+    private DataInputBuffer parsedInput;
+    private ByteBuffer parsedBinInput;
+
+    // for parsed
+    private TDMsg1(ByteBuffer buffer, Version magic) throws IOException {
+        version = magic;
+        addmode = false;
+        capacity = 0;
+
+        if (version.intValue() != Version.v4.intValue()) {
+            parsedInput = new DataInputBuffer();
+            parsedInput.reset(buffer.array(), buffer.position() + 2,
+                    buffer.remaining());
+            if (version.intValue() >= Version.v1.intValue()) {
+                createtime = parsedInput.readLong();
+            }
+
+            if (version.intValue() >= Version.v2.intValue()) {
+                this.msgcnt = parsedInput.readInt();
+            }
+
+            attrcnt = parsedInput.readInt();
+        } else {
+            byte[] binMsg = new byte[buffer.remaining() - 2];
+            System.arraycopy(buffer.array(),
+                    buffer.position() + 2, binMsg, 0, binMsg.length);
+            parsedBinInput = ByteBuffer.wrap(binMsg);
+            this.createtime = getBinCreatetime(parsedBinInput);
+            this.msgcnt = getBinMsgCnt(parsedBinInput);
+            this.isNumBid = getBinNumFlag(parsedBinInput);
+        }
+    }
+
+    private void parseDefault() throws IOException {
+        attr2Rawdata = new LinkedHashMap<String, DataByteBuffer>(
+                attrcnt * 10 / 7);
+        for (int i = 0; i < attrcnt; i++) {
+            String attr = parsedInput.readUTF();
+            int cnt = 0;
+            if (version.intValue() == Version.v2.intValue()) {
+                cnt = parsedInput.readInt();
+            }
+            int len = parsedInput.readInt();
+            int pos = parsedInput.getPosition();
+            attr2Rawdata.put(
+                    attr,
+                    new DataByteBuffer(cnt, ByteBuffer.wrap(
+                            parsedInput.getData(), pos, len)));
+            parsedInput.skip(len);
+        }
+    }
+
+    private void parseMixAttr() throws IOException {
+        attr2Rawdata = new LinkedHashMap<String, DataByteBuffer>(
+                this.msgcnt * 10 / 7);
+
+        for (int i = 0; i < attrcnt; i++) {
+            ByteBuffer bodyBuffer;
+            String commonAttr = parsedInput.readUTF();
+            int len = parsedInput.readInt();
+            int compress = parsedInput.readByte();
+            int pos = parsedInput.getPosition();
+
+            if (compress == 1) {
+                byte[] uncompressdata = new byte[Snappy.uncompressedLength(
+                        parsedInput.getData(), pos, len - 1)];
+                int msgLen = Snappy.uncompress(parsedInput.getData(), pos, len 
- 1,
+                        uncompressdata, 0);
+                bodyBuffer = ByteBuffer.wrap(uncompressdata, 0, msgLen);
+            } else {
+                bodyBuffer = ByteBuffer.wrap(parsedInput.getData(), pos, len - 
1);
+            }
+            parsedInput.skip(len - 1);
+
+            while (bodyBuffer.remaining() > 0) {
+                // total message length = (data length + attributes length) * N
+                int singleTotalLen = bodyBuffer.getInt();
+                if (singleTotalLen > bodyBuffer.remaining()) {
+                    return;
+                }
+
+                while (singleTotalLen > 0) {
+                    // single data length
+                    int msgItemLen = bodyBuffer.getInt();
+                    if (msgItemLen <= 0 || msgItemLen > singleTotalLen) {
+                        return;
+                    }
+
+                    byte[] record = new byte[1 + 4 + msgItemLen];
+                    record[0] = 0;
+                    record[1] = (byte) ((msgItemLen >> 24) & 0xFF);
+                    record[2] = (byte) ((msgItemLen >> 16) & 0xFF);
+                    record[3] = (byte) ((msgItemLen >> 8) & 0xFF);
+                    record[4] = (byte) (msgItemLen & 0xFF);
+                    bodyBuffer.get(record, 1 + 4, msgItemLen);
+
+                    // single attribute length
+                    int singleAttrLen = bodyBuffer.getInt();
+                    if (singleAttrLen <= 0 || singleAttrLen > singleTotalLen) {
+                        return;
+                    }
+                    byte[] attrBuf = new byte[singleAttrLen];
+                    bodyBuffer.get(attrBuf, 0, singleAttrLen);
+                    String finalAttr = commonAttr + "&" + new String(attrBuf);
+
+                    DataByteBuffer inputBuffer = attr2Rawdata.get(finalAttr);
+                    if (inputBuffer == null) {
+                        inputBuffer = new DataByteBuffer(0,
+                                new DataOutputBuffer(msgItemLen + 4 + 1));
+                        attr2Rawdata.put(finalAttr, inputBuffer);
+                        inputBuffer.inoutBuffer.write(record, 0, msgItemLen + 
4 + 1);
+                    } else {
+                        inputBuffer.inoutBuffer.write(record, 1, msgItemLen + 
4);
+                    }
+                    singleTotalLen = singleTotalLen - msgItemLen - 
singleAttrLen - 8;
+                }
+            }
+        }
+
+        // sync data
+        for (String attr : attr2Rawdata.keySet()) {
+            DataByteBuffer data = attr2Rawdata.get(attr);
+            data.syncByteBuffer();
+        }
+    }
+
+    private void parseBinMsg() throws IOException {
+        Map<String, String> commonAttrMap = new HashMap<String, String>();
+
+        int totalLen = parsedBinInput.getInt(BIN_MSG_TOTALLEN_OFFSET);
+        final int msgtype = parsedBinInput.get(BIN_MSG_MSGTYPE_OFFSET);
+        int bidNum = parsedBinInput.getShort(BIN_MSG_BID_OFFSET);
+        int tidNum = parsedBinInput.getShort(BIN_MSG_TID_OFFSET);
+        int bodyLen = parsedBinInput.getInt(BIN_MSG_BODYLEN_OFFSET);
+        long dataTime = parsedBinInput.getInt(BIN_MSG_DATATIME_OFFSET);
+        final int extField = parsedBinInput.getShort(BIN_MSG_EXTFIELD_OFFSET);
+        int attrLen = parsedBinInput.getShort(BIN_MSG_BODY_OFFSET + bodyLen);
+        int msgMagic = (parsedBinInput.getShort(BIN_MSG_BODY_OFFSET
+                + bodyLen + BIN_MSG_ATTRLEN_SIZE + attrLen) & 0xFFFF);
+        dataTime = dataTime * 1000;
+
+        //read common attributes
+        if (attrLen != 0) {
+            byte[] attr = new byte[attrLen];
+            parsedBinInput.position(BIN_MSG_BODY_OFFSET + bodyLen + 
BIN_MSG_ATTRLEN_SIZE);
+            parsedBinInput.get(attr);
+            String strAttr = new String(attr);
+
+            commonAttrMap = new HashMap<String, 
String>(MAP_SPLITTER.split(strAttr));
+        }
+
+        commonAttrMap.put(AttributeConstants.DATA_TIME, 
String.valueOf(dataTime));
+
+        //unzip data
+        ByteBuffer bodyBuffer;
+        byte[] body = new byte[bodyLen + 1];
+        parsedBinInput.position(BIN_MSG_BODY_OFFSET);
+        parsedBinInput.get(body, 1, bodyLen);
+        int zipType = (msgtype & 0xE0) >> 5;
+        switch (zipType) {
+            case (BIN_MSG_SNAPPY_TYPE):
+                byte[] uncompressdata =
+                        new byte[Snappy.uncompressedLength(body, 1, 
body.length - 1) + 1];
+                // uncompress flag
+                uncompressdata[0] = 0;
+                int msgLen = Snappy.uncompress(body, 1, body.length - 1,
+                        uncompressdata, 1);
+                bodyBuffer = ByteBuffer.wrap(uncompressdata, 0, msgLen + 1);
+                break;
+
+            case (BIN_MSG_NO_ZIP):
+            default:
+                //set uncompress flag
+                body[0] = 0;
+                bodyBuffer = ByteBuffer.wrap(body, 0, body.length);
+                break;
+        }
+
+        //number bid/tid
+        boolean isUseNumBid = ((extField & 0x4) == 0x0);
+        if (isUseNumBid) {
+            commonAttrMap.put(AttributeConstants.BUSINESS_ID, 
String.valueOf(bidNum));
+            commonAttrMap.put(AttributeConstants.INTERFACE_ID, 
String.valueOf(tidNum));
+        }
+
+        boolean hasOtherAttr = ((extField & 0x1) == 0x1);
+        commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, "1");
+        // with private attributes,
+        // need to splice private attributes + public attributes
+        if (!hasOtherAttr) {
+            // general attributes and data map
+            attr2Rawdata = new LinkedHashMap<String, DataByteBuffer>();
+            attr2Rawdata.put(MAP_JOINER.join(commonAttrMap),
+                    new DataByteBuffer(0, bodyBuffer));
+        } else {
+            attr2Rawdata = new LinkedHashMap<String, DataByteBuffer>(
+                    this.msgcnt * 10 / 7);
+            Map<String, String> finalAttrMap = commonAttrMap;
+
+            //skip compress flag
+            bodyBuffer.get();
+            int bodyBufLen = bodyBuffer.capacity() - 1;
+            while (bodyBufLen > 0) {
+                // get single message length
+                int singleMsgLen = bodyBuffer.getInt();
+                if (singleMsgLen <= 0 || singleMsgLen > bodyBufLen) {
+                    return;
+                }
+
+                byte[] record = new byte[1 + 4 + singleMsgLen];
+                record[0] = 0;
+                record[1] = (byte) ((singleMsgLen >> 24) & 0xFF);
+                record[2] = (byte) ((singleMsgLen >> 16) & 0xFF);
+                record[3] = (byte) ((singleMsgLen >> 8) & 0xFF);
+                record[4] = (byte) (singleMsgLen & 0xFF);
+                bodyBuffer.get(record, 1 + 4, singleMsgLen);
+
+                // get single attribute length
+                int singleAttrLen = bodyBuffer.getInt();
+                if (singleAttrLen <= 0 || singleAttrLen > bodyBufLen) {
+                    return;
+                }
+                byte[] attrBuf = new byte[singleAttrLen];
+                bodyBuffer.get(attrBuf, 0, singleAttrLen);
+                String attrBufStr = new String(attrBuf);
+
+                finalAttrMap = new HashMap<String, 
String>(MAP_SPLITTER.split(attrBufStr));
+                finalAttrMap.putAll(commonAttrMap);
+
+                DataByteBuffer inputBuffer = 
attr2Rawdata.get(MAP_JOINER.join(finalAttrMap));
+                if (inputBuffer == null) {
+                    inputBuffer = new DataByteBuffer(0,
+                            new DataOutputBuffer(singleMsgLen + 4 + 1));
+                    attr2Rawdata.put(MAP_JOINER.join(finalAttrMap), 
inputBuffer);
+                    inputBuffer.inoutBuffer.write(record, 0, singleMsgLen + 4 
+ 1);
+                } else {
+                    inputBuffer.inoutBuffer.write(record, 1, singleMsgLen + 4);
+                }
+
+                bodyBufLen = bodyBufLen - singleMsgLen - singleAttrLen - 8;
+            }
+
+            // sync data
+            for (String attr : attr2Rawdata.keySet()) {
+                DataByteBuffer data = attr2Rawdata.get(attr);
+                data.syncByteBuffer();
+            }
+        }
+    }
+
+    private void parse() throws IOException {
+        if (parsed) {
+            return;
+        }
+
+        if (version.intValue() < Version.v3.intValue()) {
+            parseDefault();
+        } else if (version.intValue() == Version.v3.intValue()) {
+            parseMixAttr();
+        } else {
+            parseBinMsg();
+        }
+
+        parsed = true;
+    }
+
+    private static Version getMagic(ByteBuffer buffer) {
+        // #lizard forgives
+        byte[] array = buffer.array();
+        if (buffer.remaining() < 4) {
+            return Version.vn;
+        }
+        int pos = buffer.position();
+        int rem = buffer.remaining();
+        if (array[pos] == MAGIC1[0] && array[pos + 1] == MAGIC1[1]
+                && array[pos + rem - 2] == MAGIC1[0]
+                && array[pos + rem - 1] == MAGIC1[1]) {
+            return Version.v1;
+        }
+        if (array[pos] == MAGIC2[0] && array[pos + 1] == MAGIC2[1]
+                && array[pos + rem - 2] == MAGIC2[0]
+                && array[pos + rem - 1] == MAGIC2[1]) {
+            return Version.v2;
+        }
+        if (array[pos] == MAGIC3[0] && array[pos + 1] == MAGIC3[1]
+                && array[pos + rem - 2] == MAGIC3[0]
+                && array[pos + rem - 1] == MAGIC3[1]) {
+            return Version.v3;
+        }
+        if (array[pos] == MAGIC4[0] && array[pos + 1] == MAGIC4[1]
+                && array[pos + rem - 2] == MAGIC4[0]
+                && array[pos + rem - 1] == MAGIC4[1]) {
+            return Version.v4;
+        }
+        if (array[pos] == MAGIC0[0] && array[pos + 1] == MAGIC0[1]
+                && array[pos + rem - 2] == MAGIC0[0]
+                && array[pos + rem - 1] == MAGIC0[1]) {
+            return Version.v0;
+        }
+        return Version.vn;
+    }
+
+    public static TDMsg1 parseFrom(byte[] data) {
+        return parseFrom(ByteBuffer.wrap(data));
+    }
+
+    public static TDMsg1 parseFrom(ByteBuffer buffer) {
+        Version magic = getMagic(buffer);
+        if (magic == Version.vn) {
+            return null;
+        }
+
+        try {
+            return new TDMsg1(buffer, magic);
+        } catch (IOException e) {
+            return null;
+        }
+    }
+
+    private void makeSureParsed() {
+        if (!parsed) {
+            try {
+                parse();
+            } catch (IOException e) {
+                //
+            }
+        }
+    }
+
+    public Set<String> getAttrs() {
+        checkMode(false);
+        makeSureParsed();
+        return this.attr2Rawdata.keySet();
+    }
+
+    public byte[] getRawData(String attr) {
+        checkMode(false);
+        makeSureParsed();
+        ByteBuffer buffer = getRawDataBuffer(attr);
+        byte[] data = new byte[buffer.remaining()];
+        System.arraycopy(buffer.array(), buffer.position(), data, 0,
+                buffer.remaining());
+        return data;
+    }
+
+    public ByteBuffer getRawDataBuffer(String attr) {
+        checkMode(false);
+        makeSureParsed();
+        return this.attr2Rawdata.get(attr).buffer;
+    }
+
+    public Iterator<byte[]> getIterator(String attr) {
+        checkMode(false);
+        makeSureParsed();
+        return getIterator(this.attr2Rawdata.get(attr).buffer);
+    }
+
+    public static Iterator<byte[]> getIterator(byte[] rawdata) {
+        return getIterator(ByteBuffer.wrap(rawdata));
+    }
+
+    public static Iterator<byte[]> getIterator(ByteBuffer rawdata) {
+        try {
+            final DataInputBuffer input = new DataInputBuffer();
+            byte[] array = rawdata.array();
+            int pos = rawdata.position();
+            int rem = rawdata.remaining() - 1;
+            int compress = array[pos];
+
+            if (compress == 1) {
+                byte[] uncompressdata = new byte[Snappy.uncompressedLength(
+                        array, pos + 1, rem)];
+                int len = Snappy.uncompress(array, pos + 1, rem,
+                        uncompressdata, 0);
+                input.reset(uncompressdata, len);
+            } else {
+                input.reset(array, pos + 1, rem);
+            }
+
+            return new Iterator<byte[]>() {
+
+                @Override
+                public boolean hasNext() {
+                    try {
+                        return input.available() > 0;
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                    return false;
+                }
+
+                @Override
+                public byte[] next() {
+                    try {
+                        int len;
+                        len = input.readInt();
+                        byte[] res = new byte[len];
+                        input.read(res);
+                        return res;
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                    return null;
+                }
+
+                @Override
+                public void remove() {
+                    this.next();
+                }
+            };
+        } catch (IOException e) {
+            e.printStackTrace();
+            return null;
+        }
+
+    }
+
+    public static Iterator<ByteBuffer> getIteratorBuffer(byte[] rawdata) {
+        return getIteratorBuffer(ByteBuffer.wrap(rawdata));
+    }
+
+    public Iterator<ByteBuffer> getIteratorBuffer(String attr) {
+        checkMode(false);
+        makeSureParsed();
+        return getIteratorBuffer(this.attr2Rawdata.get(attr).buffer);
+    }
+
+    public static Iterator<ByteBuffer> getIteratorBuffer(ByteBuffer rawdata) {
+
+        try {
+            final DataInputBuffer input = new DataInputBuffer();
+            byte[] array = rawdata.array();
+            int pos = rawdata.position();
+            int rem = rawdata.remaining() - 1;
+            int compress = array[pos];
+
+            if (compress == 1) {
+                byte[] uncompressdata = new byte[Snappy.uncompressedLength(
+                        array, pos + 1, rem)];
+                int len = Snappy.uncompress(array, pos + 1, rem,
+                        uncompressdata, 0);
+                input.reset(uncompressdata, len);
+            } else {
+                input.reset(array, pos + 1, rem);
+            }
+
+            final byte[] uncompressdata = input.getData();
+
+            return new Iterator<ByteBuffer>() {
+
+                @Override
+                public boolean hasNext() {
+                    try {
+                        return input.available() > 0;
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                    return false;
+                }
+
+                @Override
+                public ByteBuffer next() {
+                    try {
+                        int len = input.readInt();
+                        int pos = input.getPosition();
+                        input.skip(len);
+                        return ByteBuffer.wrap(uncompressdata, pos, len);
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                    return null;
+                }
+
+                @Override
+                public void remove() {
+                    this.next();
+                }
+            };
+        } catch (IOException e) {
+            e.printStackTrace();
+            return null;
+        }
+
+    }
+
+    public long getCreatetime() {
+        return createtime;
+    }
+
+    public int getAttrCount() {
+        checkMode(false);
+        return attrcnt;
+    }
+
+    public boolean isNumBid() {
+        checkMode(false);
+        return isNumBid;
+    }
+}
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsgAttrBuilder.java
 
b/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsgAttrBuilder.java
new file mode 100644
index 0000000..df90de0
--- /dev/null
+++ 
b/inlong-common/src/main/java/org/apache/inlong/commons/msg/TDMsgAttrBuilder.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.msg;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TDMsgAttrBuilder {
+
+    public enum PartitionUnit {
+        DAY("d"), HOUR("h"), HALFHOUR("n"),
+        QUARTER("q"), TENMINS("t"), FIVEMINS("f");
+        private static final Map<String, PartitionUnit> STRING_TO_TYPE_MAP =
+                new HashMap<String, PartitionUnit>();
+
+        static {
+            for (PartitionUnit type : PartitionUnit.values()) {
+                STRING_TO_TYPE_MAP.put(type.value, type);
+            }
+        }
+
+        private final String value;
+
+        private PartitionUnit(String value) {
+            this.value = value;
+        }
+
+        public static PartitionUnit of(String p) {
+            PartitionUnit type = STRING_TO_TYPE_MAP.get(p);
+            if (type == null) {
+                return PartitionUnit.HOUR;
+            }
+            return type;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+    }
+
+    public enum TimeType {
+        MS("#ms"), S("#s"),
+        STANDARD("#")/* yyyy-MM-dd HH:mm:ss */,
+        NORMAL("#n")/* yyyyMMddHH */;
+        private static final Map<String, TimeType> STRING_TO_TYPE_MAP =
+                new HashMap<String, TimeType>();
+
+        static {
+            for (TimeType type : TimeType.values()) {
+                STRING_TO_TYPE_MAP.put(type.value, type);
+            }
+        }
+
+        private final String value;
+
+        private TimeType(String value) {
+            this.value = value;
+        }
+
+        public static TimeType of(String tt) {
+            TimeType type = STRING_TO_TYPE_MAP.get(tt);
+            if (type == null) {
+                return TimeType.STANDARD;
+            }
+            return type;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+    }
+
+    public static class MsgAttrProtocolM0 {
+        private final StringBuffer attrBuffer;
+        private String id = null;
+        private String t = null;
+        private TimeType tt = TimeType.NORMAL;
+        private PartitionUnit p = PartitionUnit.HOUR;
+
+        public MsgAttrProtocolM0() {
+            attrBuffer = new StringBuffer();
+            attrBuffer.append("m=0");
+        }
+
+        public MsgAttrProtocolM0 setId(String id) {
+            this.id = id;
+            return this;
+        }
+
+        public MsgAttrProtocolM0 setSpliter(String s) {
+            attrBuffer.append("&s=").append(s);
+            return this;
+        }
+
+        public MsgAttrProtocolM0 setTime(String t) {
+            this.t = t;
+            return this;
+        }
+
+        public MsgAttrProtocolM0 setTime(long t) {
+            return this.setTime(String.valueOf(t));
+        }
+
+        public MsgAttrProtocolM0 setTimeType(String tt) {
+            this.tt = TimeType.of(tt);
+            return this;
+        }
+
+        public MsgAttrProtocolM0 setTimeType(TimeType tt) {
+            this.tt = tt;
+            return this;
+        }
+
+        public MsgAttrProtocolM0 setPartitionUnit(String p) {
+            this.p = PartitionUnit.of(p);
+            return this;
+        }
+
+        public MsgAttrProtocolM0 setPartitionUnit(PartitionUnit p) {
+            this.p = p;
+            return this;
+        }
+
+        public String buildAttr() throws Exception {
+            if (id == null) {
+                throw new Exception("id is null");
+            }
+
+            if (t == null) {
+                throw new Exception("t is null");
+            }
+
+            attrBuffer.append("&iname=").append(id);
+
+            Date d = transData(this.tt, t);
+            String tstr = null;
+            if (this.p == PartitionUnit.DAY) {
+                SimpleDateFormat f = new SimpleDateFormat("yyyyMMdd");
+                tstr = f.format(d);
+            } else if (this.p == PartitionUnit.HOUR) {
+                SimpleDateFormat f = new SimpleDateFormat("yyyyMMddHH");
+                tstr = f.format(d);
+            } else if (this.p == PartitionUnit.QUARTER) {
+                int idx =
+                        (int) ((d.getTime() % (60L * 60 * 1000)) / (15L * 60 * 
1000));
+                SimpleDateFormat f = new SimpleDateFormat("yyyyMMddHH");
+                tstr = f.format(d) + "q" + idx;
+            }
+
+            return attrBuffer.append("&t=").append(tstr).toString();
+        }
+    }
+
+    public static class MsgAttrProtocolM100 {
+
+        private final StringBuffer attrBuffer;
+        private String id = null;
+        private String t = null;
+
+        private int idp = -1;
+        private int tp = -1;
+        private TimeType tt = null;
+        private PartitionUnit p = null;
+
+        public MsgAttrProtocolM100() {
+            attrBuffer = new StringBuffer();
+            attrBuffer.append("m=100");
+        }
+
+        public MsgAttrProtocolM100 setSpliter(String s) {
+            attrBuffer.append("&s=").append(s);
+            return this;
+        }
+
+        public MsgAttrProtocolM100 setId(String id) {
+            this.id = id;
+            return this;
+        }
+
+        public MsgAttrProtocolM100 setTime(String t) {
+            this.t = t;
+            return this;
+        }
+
+        public MsgAttrProtocolM100 setTime(long t) {
+            return this.setTime(String.valueOf(t));
+        }
+
+        public MsgAttrProtocolM100 setIdPos(int idp) {
+            this.idp = idp;
+            return this;
+        }
+
+        public MsgAttrProtocolM100 setTimePos(int tp) {
+            this.tp = tp;
+            return this;
+        }
+
+        public MsgAttrProtocolM100 setTimeType(String tt) {
+            return this.setTimeType(TimeType.of(tt));
+        }
+
+        public MsgAttrProtocolM100 setTimeType(TimeType tt) {
+            this.tt = tt;
+            return this;
+        }
+
+        public MsgAttrProtocolM100 setPartitionUnit(String p) {
+            return this.setPartitionUnit(PartitionUnit.of(p));
+        }
+
+        public MsgAttrProtocolM100 setPartitionUnit(PartitionUnit p) {
+            this.p = p;
+            return this;
+        }
+
+        public String buildAttr() throws Exception {
+            // #lizard forgives
+            if (id != null) {
+                attrBuffer.append("&iname=").append(id);
+            } else if (idp >= 0) {
+                attrBuffer.append("&idp=").append(idp);
+            }
+            if (t != null) {
+                String tstr = null;
+                if (tt != null && tt == TimeType.NORMAL) {
+                    if (makeSureTimeNormal(t)) {
+                        tstr = t;
+                    }
+                } else {
+                    if (this.p == null) {
+                        this.p = PartitionUnit.HOUR;
+                    }
+                    Date d = transData(tt, t);
+                    if (this.p == PartitionUnit.DAY) {
+                        SimpleDateFormat f = new SimpleDateFormat("yyyyMMdd");
+                        tstr = f.format(d);
+                    } else if (this.p == PartitionUnit.HOUR) {
+                        SimpleDateFormat f = new 
SimpleDateFormat("yyyyMMddHH");
+                        tstr = f.format(d);
+                    } else if (this.p == PartitionUnit.HALFHOUR) {
+                        int idx =
+                                (int) ((d.getTime() % (60L * 60 * 1000)) / 
(30L * 60 * 1000));
+                        SimpleDateFormat f = new 
SimpleDateFormat("yyyyMMddHH");
+                        tstr = f.format(d) + "n" + idx;
+                    } else if (this.p == PartitionUnit.QUARTER) {
+                        int idx =
+                                (int) ((d.getTime() % (60L * 60 * 1000)) / 
(15L * 60 * 1000));
+                        SimpleDateFormat f = new 
SimpleDateFormat("yyyyMMddHH");
+                        tstr = f.format(d) + "q" + idx;
+                    } else if (this.p == PartitionUnit.TENMINS) {
+                        int idx =
+                                (int) ((d.getTime() % (60L * 60 * 1000)) / 
(10L * 60 * 1000));
+                        SimpleDateFormat f = new 
SimpleDateFormat("yyyyMMddHH");
+                        tstr = f.format(d) + "t" + idx;
+                    } else if (this.p == PartitionUnit.FIVEMINS) {
+                        int idx =
+                                (int) ((d.getTime() % (60L * 60 * 1000)) / (5L 
* 60 * 1000));
+                        SimpleDateFormat f = new 
SimpleDateFormat("yyyyMMddHH");
+                        tstr = f.format(d) + "f" + idx;
+                    }
+                }
+
+                if (tstr != null) {
+                    attrBuffer.append("&t=").append(tstr);
+                }
+
+            } else if (tp >= 0) {
+                attrBuffer.append("&tp=").append(tp);
+                if (this.tt != null) {
+                    attrBuffer.append("&tt=").append(tt);
+                }
+                if (this.p != null) {
+                    attrBuffer.append("&p=").append(p.toString());
+                }
+            }
+            return attrBuffer.toString();
+        }
+
+        private boolean makeSureTimeNormal(String time) {
+            int len = time.length();
+            return len == 8 || len == 10
+                    || (len == 12 && time.charAt(10) == 'p');
+        }
+
+    }
+
+    public static MsgAttrProtocolM0 getProtololM0() {
+        return new MsgAttrProtocolM0();
+    }
+
+    public static MsgAttrProtocolM100 getProtololM100() {
+        return new MsgAttrProtocolM100();
+    }
+
+    public static void main(String[] args) throws Exception {
+
+        SimpleDateFormat f =
+                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        SimpleDateFormat f1 =
+                new SimpleDateFormat("yyyyMMddHH");
+
+        System.out.println(TDMsgAttrBuilder.getProtololM0()
+                .setId("interfaceid").setTimeType(TimeType.S)
+                .setTime(String.valueOf(System.currentTimeMillis() / 1000))
+                .buildAttr());
+        System.out.println(TDMsgAttrBuilder.getProtololM0()
+                .setId("interfaceid").setTime(System.currentTimeMillis())
+                .setTimeType(TimeType.MS).buildAttr());
+        System.out.println(TDMsgAttrBuilder.getProtololM0()
+                .setId("interfaceid")
+                .setTime(f1.format(new Date(System.currentTimeMillis())))
+                .buildAttr());
+        System.out.println(TDMsgAttrBuilder.getProtololM0()
+                .setId("interfaceid")
+                .setTime(f.format(new Date(System.currentTimeMillis())))
+                .setTimeType(TimeType.STANDARD).buildAttr());
+        System.out.println(TDMsgAttrBuilder.getProtololM0()
+                .setId("interfaceid")
+                .setTime(f1.format(new Date(System.currentTimeMillis())))
+                .setTimeType(TimeType.NORMAL).buildAttr());
+        System.out.println(TDMsgAttrBuilder.getProtololM0()
+                .setId("interfaceid").setTimeType(TimeType.S)
+                .setTime(String.valueOf(System.currentTimeMillis() / 1000))
+                .setPartitionUnit(PartitionUnit.DAY).buildAttr());
+        System.out.println(TDMsgAttrBuilder.getProtololM0()
+                .setId("interfaceid").setTimeType(TimeType.S)
+                .setTime(String.valueOf(System.currentTimeMillis() / 1000))
+                .setPartitionUnit(PartitionUnit.HOUR).buildAttr());
+        System.out.println(TDMsgAttrBuilder.getProtololM0()
+                .setId("interfaceid").setTimeType(TimeType.S)
+                .setTime(String.valueOf(System.currentTimeMillis() / 1000))
+                .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
+        System.out.println();
+
+        System.out.print(TDMsgAttrBuilder.getProtololM100().buildAttr());
+        System.out.println("\t\t\t\t\t\t// ---- all the param is "
+                + "default : s=\\t, idp=0, tp=1, tt=#ms, p=h ");
+
+        System.out.print(TDMsgAttrBuilder.getProtololM100().setSpliter(",")
+                .buildAttr());
+        System.out.println("\t\t\t\t\t// ---- : idp=0, tp=1, tt=#ms, p=h ");
+
+        System.out.print(TDMsgAttrBuilder.getProtololM100().setSpliter(",")
+                .setIdPos(0).buildAttr());
+        System.out.println("\t\t\t\t\t// ---- : tp=1, tt=#ms, p=h ");
+
+        System.out.print(TDMsgAttrBuilder.getProtololM100().setSpliter(",")
+                .setTimePos(1).buildAttr());
+        System.out.println("\t\t\t\t\t// ---- : idp=0, tt=#ms, p=h ");
+
+        System.out.print(TDMsgAttrBuilder.getProtololM100().setSpliter(",")
+                .setIdPos(0).setTimePos(1).buildAttr());
+        System.out.println("\t\t\t\t// ---- : tt=#ms, p=h ");
+
+        System.out.print(TDMsgAttrBuilder.getProtololM100().setSpliter(",")
+                
.setIdPos(0).setTimePos(1).setTimeType(TimeType.S).buildAttr());
+        System.out.println("\t\t\t// ---- : p=h ");
+
+        System.out.print(TDMsgAttrBuilder.getProtololM100().setSpliter(",")
+                .setIdPos(0).setTimePos(1)
+                .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
+        System.out.println("\t\t\t// ---- : tt=#s ");
+
+        System.out.print(TDMsgAttrBuilder.getProtololM100().setSpliter(",")
+                .setIdPos(0).setTimePos(1).setTimeType(TimeType.MS)
+                .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
+        System.out.println("\t\t\t// ---- : all ");
+
+        System.out.print(TDMsgAttrBuilder.getProtololM100()
+                .setId("interfaceid").setSpliter(",").setIdPos(0).setTimePos(1)
+                .setTimeType(TimeType.MS)
+                .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
+        System.out.println("\t// ---- : id is set so idp is ignored ");
+
+        System.out.print(TDMsgAttrBuilder.getProtololM100().setSpliter(",")
+                .setIdPos(0).setTimePos(1).setTime(System.currentTimeMillis())
+                .setTimeType(TimeType.MS)
+                .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
+        System.out.println("\t\t\t// ---- : t is set so tp is ignored ");
+
+        System.out.print(TDMsgAttrBuilder.getProtololM100()
+                .setId("interfaceid").setSpliter(",").setIdPos(0).setTimePos(1)
+                .setTime(System.currentTimeMillis()).setTimeType(TimeType.MS)
+                .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
+        System.out
+                .println("\t\t// ---- : id and t are all set so idpos and tp 
are all ignored ");
+
+        System.out.print(TDMsgAttrBuilder.getProtololM100()
+                .setId("interfaceid").setSpliter(",").setIdPos(0).setTimePos(1)
+                .setTime(f1.format(new Date(System.currentTimeMillis())))
+                .setTimeType(TimeType.NORMAL)
+                .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
+        System.out.println("\t\t// ---- : TimeType.NORMAL ");
+
+        System.out
+                .println("\nAttention !!!! m=0 is contained by m=100, so just 
use m=100");
+
+        // long time = System.currentTimeMillis();
+        // byte[] data0 = ("id," + time + ",other,data").getBytes();
+        // String attr = TDMsgProtocolFactory.getProtololM100().setSpliter(",")
+        // .setIdPos(0).setTimePos(1).setTimeType(TimeType.MS)
+        // .setPartitionUnit(PartitionUnit.QUARTER).buildAttr();
+        // TDMsg1 tdmsg = TDMsg1.newTDMsg();
+        // tdmsg.addMsg(attr, data0);
+        // byte[] result = tdmsg.buildArray();
+        // System.out.println(new String(result));
+        // attr = TDMsgProtocolFactory.getProtololM0().setSpliter(",")
+        // .setTime(System.currentTimeMillis()).setTimeType(TimeType.MS)
+        // .setPartitionUnit(PartitionUnit.QUARTER).buildAttr();
+        // tdmsg = TDMsg1.newTDMsg();
+        // tdmsg.addMsg(attr, data0);
+        // result = tdmsg.buildArray();
+        // System.out.println(new String(result));
+        System.out.print(TDMsgAttrBuilder.getProtololM100().setSpliter(",")
+                .setIdPos(0).setTimePos(1)
+
+                .setTimeType(TimeType.STANDARD)
+                .setPartitionUnit(PartitionUnit.QUARTER).buildAttr());
+
+    }
+
+    private static Date transData(TimeType tt, String timeStr) throws 
Exception {
+        Date d = null;
+        if (tt == TimeType.MS) {
+            d = new Date(Long.valueOf(timeStr));
+        } else if (tt == TimeType.S) {
+            d = new Date(Long.valueOf(timeStr) * 1000);
+        } else if (tt == TimeType.STANDARD) {
+            SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+            d = f.parse(timeStr);
+        } else if (tt == TimeType.NORMAL) {
+            SimpleDateFormat f = new SimpleDateFormat("yyyyMMddHH");
+            d = f.parse(timeStr);
+        }
+        return d;
+    }
+}
diff --git a/pom.xml b/pom.xml
index 362adf6..7d26759 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,7 @@
     </mailingLists>
 
   <modules>
+    <module>inlong-common</module>
     <module>inlong-tubemq</module>
   </modules>
 

Reply via email to