This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new deed9c62c3 [BugFix] [InfluxDBSource] Resolve invalid SQL in
initColumnsIndex method caused by direct QUERY_LIMIT appendage with 'tz'
function. (#4829)
deed9c62c3 is described below
commit deed9c62c36b980560a4926c4bbc6732d0ac71ac
Author: zhengyuan <[email protected]>
AuthorDate: Mon Nov 27 10:28:08 2023 +0800
[BugFix] [InfluxDBSource] Resolve invalid SQL in initColumnsIndex method
caused by direct QUERY_LIMIT appendage with 'tz' function. (#4829)
---
.../seatunnel/influxdb/source/InfluxDBSource.java | 23 ++++++++-
.../e2e/connector/influxdb/InfluxdbIT.java | 34 +++++++++++++
.../resources/influxdb-to-influxdb-with-tz.conf | 56 ++++++++++++++++++++++
3 files changed, 112 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
index c979653525..b5380d916f 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
@@ -51,6 +51,8 @@ import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL;
@@ -128,7 +130,16 @@ public class InfluxDBSource
private List<Integer> initColumnsIndex(InfluxDB influxdb) {
// query one row to get column info
- String query = sourceConfig.getSql() + QUERY_LIMIT;
+ String sql = sourceConfig.getSql();
+ String query = sql + QUERY_LIMIT;
+ // if sql contains tz(), can't be append QUERY_LIMIT at last . see bug
#4231
+ int start = containTzFunction(sql.toLowerCase());
+ if (start > 0) {
+ StringBuilder tmpSql = new StringBuilder(sql);
+ tmpSql.insert(start - 1, QUERY_LIMIT).append(" ");
+ query = tmpSql.toString();
+ }
+
try {
QueryResult queryResult = influxdb.query(new Query(query,
sourceConfig.getDatabase()));
@@ -145,4 +156,14 @@ public class InfluxDBSource
e);
}
}
+
+ private static int containTzFunction(String sql) {
+ Pattern pattern = Pattern.compile("tz\\(.*\\)");
+ Matcher matcher = pattern.matcher(sql);
+ if (matcher.find()) {
+ int start = matcher.start();
+ return start;
+ }
+ return -1;
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
index d666796622..ddc7afadac 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
@@ -208,6 +208,40 @@ public class InfluxdbIT extends TestSuiteBase implements
TestResource {
}
}
+ @TestTemplate
+ public void testInfluxdbWithTz(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/influxdb-to-influxdb-with-tz.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ String sourceSql =
+ String.format("select * from %s order by time",
INFLUXDB_SOURCE_MEASUREMENT);
+ String sinkSql = String.format("select * from %s order by time",
INFLUXDB_SINK_MEASUREMENT);
+ QueryResult sourceQueryResult = influxDB.query(new Query(sourceSql,
INFLUXDB_DATABASE));
+ QueryResult sinkQueryResult = influxDB.query(new Query(sinkSql,
INFLUXDB_DATABASE));
+ // assert data count
+ Assertions.assertEquals(
+ sourceQueryResult.getResults().size(),
sinkQueryResult.getResults().size());
+ // assert data values
+ List<List<Object>> sourceValues =
+
sourceQueryResult.getResults().get(0).getSeries().get(0).getValues();
+ List<List<Object>> sinkValues =
+
sinkQueryResult.getResults().get(0).getSeries().get(0).getValues();
+ int rowSize = sourceValues.size();
+ int colSize = sourceValues.get(0).size();
+
+ for (int row = 0; row < rowSize; row++) {
+ for (int col = 0; col < colSize; col++) {
+ Object sourceColValue = sourceValues.get(row).get(col);
+ Object sinkColValue = sinkValues.get(row).get(col);
+
+ if (!Objects.deepEquals(sourceColValue, sinkColValue)) {
+ Assertions.assertEquals(sourceColValue, sinkColValue);
+ }
+ }
+ }
+ }
+
private void initializeInfluxDBClient() throws ConnectException {
InfluxDBConfig influxDBConfig = new InfluxDBConfig(influxDBConnectUrl);
influxDB = InfluxDBClient.getInfluxDB(influxDBConfig);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf
new file mode 100644
index 0000000000..4b7666130d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ InfluxDB {
+ url = "http://influxdb-host:8086"
+ sql = "select label, c_string, c_double, c_bigint, c_float, c_int,
c_smallint, c_boolean from source tz('Asia/Shanghai')"
+ database = "test"
+ schema {
+ fields {
+ label = STRING
+ c_string = STRING
+ c_double = DOUBLE
+ c_bigint = BIGINT
+ c_float = FLOAT
+ c_int = INT
+ c_smallint = SMALLINT
+ c_boolean = BOOLEAN
+ time = BIGINT
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ InfluxDB {
+ url = "http://influxdb-host:8086"
+ database = "test"
+ measurement = "sink"
+ key_time = "time"
+ key_tags = ["label"]
+ batch_size = 1
+ }
+}
\ No newline at end of file