This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ad6a4b  [IOTDB-1880] Compatibility of Apache IoTDB with InfluxDB - 
Data Record Insertion (#4231)
5ad6a4b is described below

commit 5ad6a4b040b1bd87bcb1f2b5a89c8ca9f89e5bf9
Author: Xieqijun <[email protected]>
AuthorDate: Thu Oct 28 10:04:05 2021 +0800

    [IOTDB-1880] Compatibility of Apache IoTDB with InfluxDB - Data Record 
Insertion (#4231)
---
 .../influxdb/protocol/input/InfluxLineProtocol.g4  |  93 ++++++++++++
 .../org/apache/iotdb/influxdb/IoTDBInfluxDB.java   |  31 ++--
 .../iotdb/influxdb/example/InfluxDBExample.java    |   2 +-
 .../influxdb/protocol/input/InfluxLineParser.java  | 165 +++++++++++++++++++++
 .../influxdb/integration/IoTDBInfluxDBIT.java      |   3 +
 .../protocol/input/InfluxLineParserTest.java       |  55 +++++++
 6 files changed, 339 insertions(+), 10 deletions(-)

diff --git 
a/influxdb-protocol/src/main/antlr4/org/apache/iotdb/influxdb/protocol/input/InfluxLineProtocol.g4
 
b/influxdb-protocol/src/main/antlr4/org/apache/iotdb/influxdb/protocol/input/InfluxLineProtocol.g4
new file mode 100644
index 0000000..9871622
--- /dev/null
+++ 
b/influxdb-protocol/src/main/antlr4/org/apache/iotdb/influxdb/protocol/input/InfluxLineProtocol.g4
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ /**
+  *     This file contains code copied from the
+  *     
href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/antlr4/org/apache/druid/data/input/influx/InfluxLineProtocol.g4>
+  *     licensed under the Apache License, Version 2.0.
+ **/
+
+grammar InfluxLineProtocol;
+
+lines
+    : line ('\n' line)* '\n'? EOF
+;
+
+line
+    : identifier (',' tag_set)? ' ' field_set (' ' timestamp)?
+;
+
+timestamp
+    : NUMBER
+;
+
+field_set
+    : field_pair (',' field_pair)*
+;
+
+tag_set
+    : tag_pair (',' tag_pair)*
+;
+
+tag_pair
+    : identifier '=' identifier
+;
+
+field_pair
+    : identifier '=' field_value
+;
+
+identifier
+    : IDENTIFIER_STRING | NUMBER | BOOLEAN
+;
+
+field_value
+    : QUOTED_STRING | NUMBER | BOOLEAN
+;
+
+eol
+    : NEWLINE | EOF
+;
+
+NEWLINE
+    : '\n'
+;
+
+NUMBER
+    : '-'? INT ('.' [0-9] +) ? 'i'?
+;
+
+BOOLEAN
+    : 'TRUE' | 'true' | 'True' | 't' | 'T' | 'FALSE' | 'False' | 'false' | 'F' 
| 'f'
+;
+
+QUOTED_STRING
+    : '"' (StringFieldEscapeSequence | ~(["\\]) )* '"'
+;
+IDENTIFIER_STRING
+    : (IdentifierEscapeSequence | ~([,= \n\\]) )+
+;
+fragment IdentifierEscapeSequence
+    : '\\' [,= \\]
+;
+fragment StringFieldEscapeSequence
+    : '\\' ["\\]
+;
+fragment INT
+   : '0' | [1-9] [0-9]*
+;
\ No newline at end of file
diff --git 
a/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/IoTDBInfluxDB.java 
b/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/IoTDBInfluxDB.java
index 9cf2ba0..e199c77 100644
--- 
a/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/IoTDBInfluxDB.java
+++ 
b/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/IoTDBInfluxDB.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.influxdb;
 
 import org.apache.iotdb.influxdb.protocol.constant.InfluxDBConstant;
 import org.apache.iotdb.influxdb.protocol.impl.IoTDBInfluxDBService;
+import org.apache.iotdb.influxdb.protocol.input.InfluxLineParser;
 import org.apache.iotdb.influxdb.protocol.util.ParameterUtils;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
@@ -30,7 +31,11 @@ import org.apache.iotdb.session.SessionDataSet;
 import org.influxdb.BatchOptions;
 import org.influxdb.InfluxDB;
 import org.influxdb.InfluxDBException;
-import org.influxdb.dto.*;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.influxdb.dto.Pong;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
 import org.influxdb.impl.TimeUtil;
 
 import java.net.URI;
@@ -120,12 +125,12 @@ public class IoTDBInfluxDB implements InfluxDB {
 
   @Override
   public void write(final String records) {
-    throw new 
UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED);
+    write(null, null, null, null, records);
   }
 
   @Override
   public void write(final List<String> records) {
-    throw new 
UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED);
+    write(String.join("\n", records));
   }
 
   @Override
@@ -134,7 +139,7 @@ public class IoTDBInfluxDB implements InfluxDB {
       final String retentionPolicy,
       final ConsistencyLevel consistency,
       final String records) {
-    throw new 
UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED);
+    write(database, retentionPolicy, consistency, null, records);
   }
 
   @Override
@@ -144,7 +149,14 @@ public class IoTDBInfluxDB implements InfluxDB {
       final ConsistencyLevel consistency,
       final TimeUnit precision,
       final String records) {
-    throw new 
UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED);
+    BatchPoints batchPoints =
+        BatchPoints.database(database)
+            .retentionPolicy(retentionPolicy)
+            .consistency(consistency)
+            .precision(precision)
+            .points(InfluxLineParser.parserRecordsToPoints(records, precision))
+            .build();
+    write(batchPoints);
   }
 
   @Override
@@ -153,7 +165,7 @@ public class IoTDBInfluxDB implements InfluxDB {
       final String retentionPolicy,
       final ConsistencyLevel consistency,
       final List<String> records) {
-    throw new 
UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED);
+    write(database, retentionPolicy, consistency, null, String.join("\n", 
records));
   }
 
   @Override
@@ -163,17 +175,17 @@ public class IoTDBInfluxDB implements InfluxDB {
       final ConsistencyLevel consistency,
       final TimeUnit precision,
       final List<String> records) {
-    throw new 
UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED);
+    write(database, retentionPolicy, consistency, precision, String.join("\n", 
records));
   }
 
   @Override
   public void write(final int udpPort, final String records) {
-    throw new 
UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED);
+    write(records);
   }
 
   @Override
   public void write(final int udpPort, final List<String> records) {
-    throw new 
UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED);
+    write(String.join("\n", records));
   }
 
   @Override
@@ -414,6 +426,7 @@ public class IoTDBInfluxDB implements InfluxDB {
       while (sessionDataSet.hasNext()) {
         version = sessionDataSet.next().getFields().get(0).getStringValue();
       }
+      sessionDataSet.closeOperationHandle();
       return version;
     } catch (StatementExecutionException | IoTDBConnectionException e) {
       throw new InfluxDBException(e.getMessage());
diff --git 
a/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/example/InfluxDBExample.java
 
b/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/example/InfluxDBExample.java
index 4b8904d..e8f01d5 100644
--- 
a/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/example/InfluxDBExample.java
+++ 
b/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/example/InfluxDBExample.java
@@ -48,7 +48,7 @@ public class InfluxDBExample {
     Map<String, Object> fields = new HashMap<>();
     tags.put("name", "xie");
     tags.put("sex", "m");
-    fields.put("score", 87);
+    fields.put("score", 87.0);
     fields.put("tel", "110");
     fields.put("country", "china");
     builder.tag(tags);
diff --git 
a/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/protocol/input/InfluxLineParser.java
 
b/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/protocol/input/InfluxLineParser.java
new file mode 100644
index 0000000..5f74111
--- /dev/null
+++ 
b/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/protocol/input/InfluxLineParser.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a
+ *     
href=https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/";>Line
+ *     Protocol</a> This class contains code copied from the <a
+ *     
href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ *     Apache Druid InfluxDB Parser </a>, licensed under the Apache License, 
Version 2.0.
+ */
+package org.apache.iotdb.influxdb.protocol.input;
+
+import org.antlr.v4.runtime.*;
+import org.influxdb.InfluxDBException;
+import org.influxdb.dto.Point;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+public class InfluxLineParser {
+
+  private static final Pattern BACKSLASH_PATTERN = Pattern.compile("\\\\\"");
+  private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("\\\\([,= 
])");
+
+  public static Collection<Point> parserRecordsToPoints(String records) {
+    return parserRecordsToPoints(records, null);
+  }
+
+  public static Collection<Point> parserRecordsToPoints(String records, 
TimeUnit precision) {
+    if (precision == null) {
+      precision = TimeUnit.NANOSECONDS;
+    }
+    ArrayList<Point> points = new ArrayList<>();
+    String[] recordsSplit = records.split("\n");
+    for (String record : recordsSplit) {
+      points.add(parseToPoint(record, precision));
+    }
+    return points;
+  }
+
+  public static Point parseToPoint(String input) {
+    return parseToPoint(input, null);
+  }
+
+  public static Point parseToPoint(String input, TimeUnit precision) {
+    if (precision == null) {
+      precision = TimeUnit.NANOSECONDS;
+    }
+    CharStream charStream = new ANTLRInputStream(input);
+    InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
+    TokenStream tokenStream = new CommonTokenStream(lexer);
+    InfluxLineProtocolParser parser = new 
InfluxLineProtocolParser(tokenStream);
+
+    List<InfluxLineProtocolParser.LineContext> lines = parser.lines().line();
+    if (parser.getNumberOfSyntaxErrors() != 0) {
+      throw new InfluxDBException("Unable to parse line.");
+    }
+    if (lines.size() != 1) {
+      throw new InfluxDBException(
+          "Multiple lines present; unable to parse more than one per record.");
+    }
+
+    Point.Builder builder;
+    Map<String, String> tags = new HashMap<>();
+    Map<String, Object> fields = new HashMap<>();
+    Long timestamp = null;
+
+    InfluxLineProtocolParser.LineContext line = lines.get(0);
+    String measurement = parseIdentifier(line.identifier());
+
+    builder = Point.measurement(measurement);
+    if (line.tag_set() != null) {
+      line.tag_set().tag_pair().forEach(t -> parseTag(t, tags));
+    }
+    line.field_set().field_pair().forEach(t -> parseField(t, fields));
+
+    if (line.timestamp() != null) {
+      String timestampString = line.timestamp().getText();
+      timestamp = parseTimestamp(timestampString);
+    }
+    if (timestamp == null) {
+      timestamp = System.currentTimeMillis();
+    }
+    return builder.tag(tags).fields(fields).time(timestamp, precision).build();
+  }
+
+  private static void parseTag(
+      InfluxLineProtocolParser.Tag_pairContext tag, Map<String, String> out) {
+    String key = parseIdentifier(tag.identifier(0));
+    String value = parseIdentifier(tag.identifier(1));
+    out.put(key, value);
+  }
+
+  private static void parseField(
+      InfluxLineProtocolParser.Field_pairContext field, Map<String, Object> 
out) {
+    String key = parseIdentifier(field.identifier());
+    InfluxLineProtocolParser.Field_valueContext valueContext = 
field.field_value();
+    Object value;
+    if (valueContext.NUMBER() != null) {
+      value = parseNumber(valueContext.NUMBER().getText());
+    } else if (valueContext.BOOLEAN() != null) {
+      value = parseBool(valueContext.BOOLEAN().getText());
+    } else {
+      value = parseQuotedString(valueContext.QUOTED_STRING().getText());
+    }
+    out.put(key, value);
+  }
+
+  private static Object parseQuotedString(String text) {
+    return BACKSLASH_PATTERN.matcher(text.substring(1, text.length() - 
1)).replaceAll("\"");
+  }
+
+  private static Object parseNumber(String raw) {
+    if (raw.endsWith("i")) {
+      return Long.valueOf(raw.substring(0, raw.length() - 1));
+    }
+
+    return new Double(raw);
+  }
+
+  private static Object parseBool(String raw) {
+    char first = raw.charAt(0);
+    if (first == 't' || first == 'T') {
+      return "true";
+    } else {
+      return "false";
+    }
+  }
+
+  private static String 
parseIdentifier(InfluxLineProtocolParser.IdentifierContext ctx) {
+    if (ctx.BOOLEAN() != null || ctx.NUMBER() != null) {
+      return ctx.getText();
+    }
+
+    return 
IDENTIFIER_PATTERN.matcher(ctx.IDENTIFIER_STRING().getText()).replaceAll("$1");
+  }
+
+  private static Long parseTimestamp(String timestamp) {
+    // Influx timestamps come in nanoseconds; treat anything less than 1 ms as 0
+    if (timestamp.length() < 7) {
+      return 0L;
+    } else {
+      return Long.parseLong(timestamp);
+    }
+  }
+}
diff --git 
a/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/integration/IoTDBInfluxDBIT.java
 
b/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/integration/IoTDBInfluxDBIT.java
index e02b5ef..7a1e0aa 100644
--- 
a/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/integration/IoTDBInfluxDBIT.java
+++ 
b/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/integration/IoTDBInfluxDBIT.java
@@ -201,6 +201,7 @@ public class IoTDBInfluxDBIT {
           assertEquals(expected[i], actual.toString());
         }
       }
+      sessionDataSet.closeOperationHandle();
     } catch (StatementExecutionException | IoTDBConnectionException e) {
       throw new InfluxDBException(e.getMessage());
     }
@@ -219,6 +220,7 @@ public class IoTDBInfluxDBIT {
           assertEquals(expected[i], fields.get(i).toString());
         }
       }
+      sessionDataSet.closeOperationHandle();
     } catch (StatementExecutionException | IoTDBConnectionException e) {
       throw new InfluxDBException(e.getMessage());
     }
@@ -237,6 +239,7 @@ public class IoTDBInfluxDBIT {
           assertEquals(expected[i], fields.get(i).toString());
         }
       }
+      sessionDataSet.closeOperationHandle();
     } catch (StatementExecutionException | IoTDBConnectionException e) {
       throw new InfluxDBException(e.getMessage());
     }
diff --git 
a/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/protocol/input/InfluxLineParserTest.java
 
b/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/protocol/input/InfluxLineParserTest.java
new file mode 100644
index 0000000..a1a7e7f
--- /dev/null
+++ 
b/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/protocol/input/InfluxLineParserTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.influxdb.protocol.input;
+
+import org.influxdb.dto.Point;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+public class InfluxLineParserTest {
+  @Test
+  public void parseToPointTest() {
+    String[] records = {
+      "student,name=xie,sex=m country=\"china\",score=87.0,tel=\"110\" 
1635177018815000000",
+      "student,name=xie,sex=m country=\"china\",score=87i,tel=990i 
1635187018815000000",
+      "cpu,name=xie country=\"china\",score=100.0 1635187018815000000"
+    };
+    int expectLength = 3;
+    for (int i = 0; i < expectLength; i++) {
+      Assert.assertEquals(records[i], 
InfluxLineParser.parseToPoint(records[i]).lineProtocol());
+    }
+  }
+
+  @Test
+  public void parserRecordsToPoints() {
+    String[] records = {
+      "student,name=xie,sex=m country=\"china\",score=87.0,tel=\"110\" 
1635177018815000000",
+      "student,name=xie,sex=m country=\"china\",score=87i,tel=990i 
1635187018815000000",
+      "cpu,name=xie country=\"china\",score=100.0 1635187018815000000"
+    };
+    int expectLength = 3;
+    ArrayList<Point> points =
+        (ArrayList<Point>) 
InfluxLineParser.parserRecordsToPoints(String.join("\n", records));
+    for (int i = 0; i < expectLength; i++) {
+      Assert.assertEquals(records[i], points.get(i).lineProtocol());
+    }
+  }
+}

Reply via email to