This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5ad6a4b [IOTDB-1880] Compatibility of Apache IoTDB with InfluxDB -
Data Record Insertion (#4231)
5ad6a4b is described below
commit 5ad6a4b040b1bd87bcb1f2b5a89c8ca9f89e5bf9
Author: Xieqijun <[email protected]>
AuthorDate: Thu Oct 28 10:04:05 2021 +0800
[IOTDB-1880] Compatibility of Apache IoTDB with InfluxDB - Data Record
Insertion (#4231)
---
.../influxdb/protocol/input/InfluxLineProtocol.g4 | 93 ++++++++++++
.../org/apache/iotdb/influxdb/IoTDBInfluxDB.java | 31 ++--
.../iotdb/influxdb/example/InfluxDBExample.java | 2 +-
.../influxdb/protocol/input/InfluxLineParser.java | 165 +++++++++++++++++++++
.../influxdb/integration/IoTDBInfluxDBIT.java | 3 +
.../protocol/input/InfluxLineParserTest.java | 55 +++++++
6 files changed, 339 insertions(+), 10 deletions(-)
diff --git
a/influxdb-protocol/src/main/antlr4/org/apache/iotdb/influxdb/protocol/input/InfluxLineProtocol.g4
b/influxdb-protocol/src/main/antlr4/org/apache/iotdb/influxdb/protocol/input/InfluxLineProtocol.g4
new file mode 100644
index 0000000..9871622
--- /dev/null
+++
b/influxdb-protocol/src/main/antlr4/org/apache/iotdb/influxdb/protocol/input/InfluxLineProtocol.g4
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ /**
+ * This file contains code copied from the
+ *
href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/antlr4/org/apache/druid/data/input/influx/InfluxLineProtocol.g4>
+ * licensed under the Apache License, Version 2.0.
+ **/
+
+grammar InfluxLineProtocol;
+
+lines
+ : line ('\n' line)* '\n'? EOF
+;
+
+line
+ : identifier (',' tag_set)? ' ' field_set (' ' timestamp)?
+;
+
+timestamp
+ : NUMBER
+;
+
+field_set
+ : field_pair (',' field_pair)*
+;
+
+tag_set
+ : tag_pair (',' tag_pair)*
+;
+
+tag_pair
+ : identifier '=' identifier
+;
+
+field_pair
+ : identifier '=' field_value
+;
+
+identifier
+ : IDENTIFIER_STRING | NUMBER | BOOLEAN
+;
+
+field_value
+ : QUOTED_STRING | NUMBER | BOOLEAN
+;
+
+eol
+ : NEWLINE | EOF
+;
+
+NEWLINE
+ : '\n'
+;
+
+NUMBER
+ : '-'? INT ('.' [0-9] +) ? 'i'?
+;
+
+BOOLEAN
+ : 'TRUE' | 'true' | 'True' | 't' | 'T' | 'FALSE' | 'False' | 'false' | 'F'
| 'f'
+;
+
+QUOTED_STRING
+ : '"' (StringFieldEscapeSequence | ~(["\\]) )* '"'
+;
+IDENTIFIER_STRING
+ : (IdentifierEscapeSequence | ~([,= \n\\]) )+
+;
+fragment IdentifierEscapeSequence
+ : '\\' [,= \\]
+;
+fragment StringFieldEscapeSequence
+ : '\\' ["\\]
+;
+fragment INT
+ : '0' | [1-9] [0-9]*
+;
\ No newline at end of file
diff --git
a/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/IoTDBInfluxDB.java
b/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/IoTDBInfluxDB.java
index 9cf2ba0..e199c77 100644
---
a/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/IoTDBInfluxDB.java
+++
b/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/IoTDBInfluxDB.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.influxdb;
import org.apache.iotdb.influxdb.protocol.constant.InfluxDBConstant;
import org.apache.iotdb.influxdb.protocol.impl.IoTDBInfluxDBService;
+import org.apache.iotdb.influxdb.protocol.input.InfluxLineParser;
import org.apache.iotdb.influxdb.protocol.util.ParameterUtils;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
@@ -30,7 +31,11 @@ import org.apache.iotdb.session.SessionDataSet;
import org.influxdb.BatchOptions;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBException;
-import org.influxdb.dto.*;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.influxdb.dto.Pong;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
import org.influxdb.impl.TimeUtil;
import java.net.URI;
@@ -120,12 +125,12 @@ public class IoTDBInfluxDB implements InfluxDB {
@Override
public void write(final String records) {
- throw new
UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED);
+ write(null, null, null, null, records);
}
@Override
public void write(final List<String> records) {
- throw new
UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED);
+ write(String.join("\n", records));
}
@Override
@@ -134,7 +139,7 @@ public class IoTDBInfluxDB implements InfluxDB {
final String retentionPolicy,
final ConsistencyLevel consistency,
final String records) {
- throw new
UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED);
+ write(database, retentionPolicy, consistency, null, records);
}
@Override
@@ -144,7 +149,14 @@ public class IoTDBInfluxDB implements InfluxDB {
final ConsistencyLevel consistency,
final TimeUnit precision,
final String records) {
- throw new
UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED);
+ BatchPoints batchPoints =
+ BatchPoints.database(database)
+ .retentionPolicy(retentionPolicy)
+ .consistency(consistency)
+ .precision(precision)
+ .points(InfluxLineParser.parserRecordsToPoints(records, precision))
+ .build();
+ write(batchPoints);
}
@Override
@@ -153,7 +165,7 @@ public class IoTDBInfluxDB implements InfluxDB {
final String retentionPolicy,
final ConsistencyLevel consistency,
final List<String> records) {
- throw new
UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED);
+ write(database, retentionPolicy, consistency, null, String.join("\n",
records));
}
@Override
@@ -163,17 +175,17 @@ public class IoTDBInfluxDB implements InfluxDB {
final ConsistencyLevel consistency,
final TimeUnit precision,
final List<String> records) {
- throw new
UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED);
+ write(database, retentionPolicy, consistency, precision, String.join("\n",
records));
}
@Override
public void write(final int udpPort, final String records) {
- throw new
UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED);
+ write(records);
}
@Override
public void write(final int udpPort, final List<String> records) {
- throw new
UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED);
+ write(String.join("\n", records));
}
@Override
@@ -414,6 +426,7 @@ public class IoTDBInfluxDB implements InfluxDB {
while (sessionDataSet.hasNext()) {
version = sessionDataSet.next().getFields().get(0).getStringValue();
}
+ sessionDataSet.closeOperationHandle();
return version;
} catch (StatementExecutionException | IoTDBConnectionException e) {
throw new InfluxDBException(e.getMessage());
diff --git
a/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/example/InfluxDBExample.java
b/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/example/InfluxDBExample.java
index 4b8904d..e8f01d5 100644
---
a/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/example/InfluxDBExample.java
+++
b/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/example/InfluxDBExample.java
@@ -48,7 +48,7 @@ public class InfluxDBExample {
Map<String, Object> fields = new HashMap<>();
tags.put("name", "xie");
tags.put("sex", "m");
- fields.put("score", 87);
+ fields.put("score", 87.0);
fields.put("tel", "110");
fields.put("country", "china");
builder.tag(tags);
diff --git
a/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/protocol/input/InfluxLineParser.java
b/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/protocol/input/InfluxLineParser.java
new file mode 100644
index 0000000..5f74111
--- /dev/null
+++
b/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/protocol/input/InfluxLineParser.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a
+ *
href=https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/">Line
+ * Protocol</a> This class contains code copied from the <a
+ *
href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ * Apache Druid InfluxDB Parser </a>, licensed under the Apache License,
Version 2.0.
+ */
+package org.apache.iotdb.influxdb.protocol.input;
+
+import org.antlr.v4.runtime.*;
+import org.influxdb.InfluxDBException;
+import org.influxdb.dto.Point;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+public class InfluxLineParser {
+
+ private static final Pattern BACKSLASH_PATTERN = Pattern.compile("\\\\\"");
+ private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("\\\\([,=
])");
+
+ public static Collection<Point> parserRecordsToPoints(String records) {
+ return parserRecordsToPoints(records, null);
+ }
+
+ public static Collection<Point> parserRecordsToPoints(String records,
TimeUnit precision) {
+ if (precision == null) {
+ precision = TimeUnit.NANOSECONDS;
+ }
+ ArrayList<Point> points = new ArrayList<>();
+ String[] recordsSplit = records.split("\n");
+ for (String record : recordsSplit) {
+ points.add(parseToPoint(record, precision));
+ }
+ return points;
+ }
+
+ public static Point parseToPoint(String input) {
+ return parseToPoint(input, null);
+ }
+
+ public static Point parseToPoint(String input, TimeUnit precision) {
+ if (precision == null) {
+ precision = TimeUnit.NANOSECONDS;
+ }
+ CharStream charStream = new ANTLRInputStream(input);
+ InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
+ TokenStream tokenStream = new CommonTokenStream(lexer);
+ InfluxLineProtocolParser parser = new
InfluxLineProtocolParser(tokenStream);
+
+ List<InfluxLineProtocolParser.LineContext> lines = parser.lines().line();
+ if (parser.getNumberOfSyntaxErrors() != 0) {
+ throw new InfluxDBException("Unable to parse line.");
+ }
+ if (lines.size() != 1) {
+ throw new InfluxDBException(
+ "Multiple lines present; unable to parse more than one per record.");
+ }
+
+ Point.Builder builder;
+ Map<String, String> tags = new HashMap<>();
+ Map<String, Object> fields = new HashMap<>();
+ Long timestamp = null;
+
+ InfluxLineProtocolParser.LineContext line = lines.get(0);
+ String measurement = parseIdentifier(line.identifier());
+
+ builder = Point.measurement(measurement);
+ if (line.tag_set() != null) {
+ line.tag_set().tag_pair().forEach(t -> parseTag(t, tags));
+ }
+ line.field_set().field_pair().forEach(t -> parseField(t, fields));
+
+ if (line.timestamp() != null) {
+ String timestampString = line.timestamp().getText();
+ timestamp = parseTimestamp(timestampString);
+ }
+ if (timestamp == null) {
+ timestamp = System.currentTimeMillis();
+ }
+ return builder.tag(tags).fields(fields).time(timestamp, precision).build();
+ }
+
+ private static void parseTag(
+ InfluxLineProtocolParser.Tag_pairContext tag, Map<String, String> out) {
+ String key = parseIdentifier(tag.identifier(0));
+ String value = parseIdentifier(tag.identifier(1));
+ out.put(key, value);
+ }
+
+ private static void parseField(
+ InfluxLineProtocolParser.Field_pairContext field, Map<String, Object>
out) {
+ String key = parseIdentifier(field.identifier());
+ InfluxLineProtocolParser.Field_valueContext valueContext =
field.field_value();
+ Object value;
+ if (valueContext.NUMBER() != null) {
+ value = parseNumber(valueContext.NUMBER().getText());
+ } else if (valueContext.BOOLEAN() != null) {
+ value = parseBool(valueContext.BOOLEAN().getText());
+ } else {
+ value = parseQuotedString(valueContext.QUOTED_STRING().getText());
+ }
+ out.put(key, value);
+ }
+
+ private static Object parseQuotedString(String text) {
+ return BACKSLASH_PATTERN.matcher(text.substring(1, text.length() -
1)).replaceAll("\"");
+ }
+
+ private static Object parseNumber(String raw) {
+ if (raw.endsWith("i")) {
+ return Long.valueOf(raw.substring(0, raw.length() - 1));
+ }
+
+ return new Double(raw);
+ }
+
+ private static Object parseBool(String raw) {
+ char first = raw.charAt(0);
+ if (first == 't' || first == 'T') {
+ return "true";
+ } else {
+ return "false";
+ }
+ }
+
+ private static String
parseIdentifier(InfluxLineProtocolParser.IdentifierContext ctx) {
+ if (ctx.BOOLEAN() != null || ctx.NUMBER() != null) {
+ return ctx.getText();
+ }
+
+ return
IDENTIFIER_PATTERN.matcher(ctx.IDENTIFIER_STRING().getText()).replaceAll("$1");
+ }
+
+ private static Long parseTimestamp(String timestamp) {
+ // Influx timestamps come in nanoseconds; treat anything less than 1 ms as 0
+ if (timestamp.length() < 7) {
+ return 0L;
+ } else {
+ return Long.parseLong(timestamp);
+ }
+ }
+}
diff --git
a/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/integration/IoTDBInfluxDBIT.java
b/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/integration/IoTDBInfluxDBIT.java
index e02b5ef..7a1e0aa 100644
---
a/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/integration/IoTDBInfluxDBIT.java
+++
b/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/integration/IoTDBInfluxDBIT.java
@@ -201,6 +201,7 @@ public class IoTDBInfluxDBIT {
assertEquals(expected[i], actual.toString());
}
}
+ sessionDataSet.closeOperationHandle();
} catch (StatementExecutionException | IoTDBConnectionException e) {
throw new InfluxDBException(e.getMessage());
}
@@ -219,6 +220,7 @@ public class IoTDBInfluxDBIT {
assertEquals(expected[i], fields.get(i).toString());
}
}
+ sessionDataSet.closeOperationHandle();
} catch (StatementExecutionException | IoTDBConnectionException e) {
throw new InfluxDBException(e.getMessage());
}
@@ -237,6 +239,7 @@ public class IoTDBInfluxDBIT {
assertEquals(expected[i], fields.get(i).toString());
}
}
+ sessionDataSet.closeOperationHandle();
} catch (StatementExecutionException | IoTDBConnectionException e) {
throw new InfluxDBException(e.getMessage());
}
diff --git
a/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/protocol/input/InfluxLineParserTest.java
b/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/protocol/input/InfluxLineParserTest.java
new file mode 100644
index 0000000..a1a7e7f
--- /dev/null
+++
b/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/protocol/input/InfluxLineParserTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.influxdb.protocol.input;
+
+import org.influxdb.dto.Point;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+public class InfluxLineParserTest {
+ @Test
+ public void parseToPointTest() {
+ String[] records = {
+ "student,name=xie,sex=m country=\"china\",score=87.0,tel=\"110\"
1635177018815000000",
+ "student,name=xie,sex=m country=\"china\",score=87i,tel=990i
1635187018815000000",
+ "cpu,name=xie country=\"china\",score=100.0 1635187018815000000"
+ };
+ int expectLength = 3;
+ for (int i = 0; i < expectLength; i++) {
+ Assert.assertEquals(records[i],
InfluxLineParser.parseToPoint(records[i]).lineProtocol());
+ }
+ }
+
+ @Test
+ public void parserRecordsToPoints() {
+ String[] records = {
+ "student,name=xie,sex=m country=\"china\",score=87.0,tel=\"110\"
1635177018815000000",
+ "student,name=xie,sex=m country=\"china\",score=87i,tel=990i
1635187018815000000",
+ "cpu,name=xie country=\"china\",score=100.0 1635187018815000000"
+ };
+ int expectLength = 3;
+ ArrayList<Point> points =
+ (ArrayList<Point>)
InfluxLineParser.parserRecordsToPoints(String.join("\n", records));
+ for (int i = 0; i < expectLength; i++) {
+ Assert.assertEquals(records[i], points.get(i).lineProtocol());
+ }
+ }
+}