This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new deed9c62c3 [BugFix] [InfluxDBSource] Resolve invalid SQL in 
initColumnsIndex method caused by direct QUERY_LIMIT appendage with 'tz' 
function. (#4829)
deed9c62c3 is described below

commit deed9c62c36b980560a4926c4bbc6732d0ac71ac
Author: zhengyuan <[email protected]>
AuthorDate: Mon Nov 27 10:28:08 2023 +0800

    [BugFix] [InfluxDBSource] Resolve invalid SQL in initColumnsIndex method 
caused by direct QUERY_LIMIT appendage with 'tz' function. (#4829)
---
 .../seatunnel/influxdb/source/InfluxDBSource.java  | 23 ++++++++-
 .../e2e/connector/influxdb/InfluxdbIT.java         | 34 +++++++++++++
 .../resources/influxdb-to-influxdb-with-tz.conf    | 56 ++++++++++++++++++++++
 3 files changed, 112 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
index c979653525..b5380d916f 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
@@ -51,6 +51,8 @@ import lombok.extern.slf4j.Slf4j;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL;
@@ -128,7 +130,16 @@ public class InfluxDBSource
 
     private List<Integer> initColumnsIndex(InfluxDB influxdb) {
         // query one row to get column info
-        String query = sourceConfig.getSql() + QUERY_LIMIT;
+        String sql = sourceConfig.getSql();
+        String query = sql + QUERY_LIMIT;
+        // if sql contains tz(), can't be append QUERY_LIMIT at last . see bug 
#4231
+        int start = containTzFunction(sql.toLowerCase());
+        if (start > 0) {
+            StringBuilder tmpSql = new StringBuilder(sql);
+            tmpSql.insert(start - 1, QUERY_LIMIT).append(" ");
+            query = tmpSql.toString();
+        }
+
         try {
             QueryResult queryResult = influxdb.query(new Query(query, 
sourceConfig.getDatabase()));
 
@@ -145,4 +156,14 @@ public class InfluxDBSource
                     e);
         }
     }
+
+    private static int containTzFunction(String sql) {
+        Pattern pattern = Pattern.compile("tz\\(.*\\)");
+        Matcher matcher = pattern.matcher(sql);
+        if (matcher.find()) {
+            int start = matcher.start();
+            return start;
+        }
+        return -1;
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
index d666796622..ddc7afadac 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
@@ -208,6 +208,40 @@ public class InfluxdbIT extends TestSuiteBase implements 
TestResource {
         }
     }
 
+    @TestTemplate
+    public void testInfluxdbWithTz(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/influxdb-to-influxdb-with-tz.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        String sourceSql =
+                String.format("select * from %s order by time", 
INFLUXDB_SOURCE_MEASUREMENT);
+        String sinkSql = String.format("select * from %s order by time", 
INFLUXDB_SINK_MEASUREMENT);
+        QueryResult sourceQueryResult = influxDB.query(new Query(sourceSql, 
INFLUXDB_DATABASE));
+        QueryResult sinkQueryResult = influxDB.query(new Query(sinkSql, 
INFLUXDB_DATABASE));
+        // assert data count
+        Assertions.assertEquals(
+                sourceQueryResult.getResults().size(), 
sinkQueryResult.getResults().size());
+        // assert data values
+        List<List<Object>> sourceValues =
+                
sourceQueryResult.getResults().get(0).getSeries().get(0).getValues();
+        List<List<Object>> sinkValues =
+                
sinkQueryResult.getResults().get(0).getSeries().get(0).getValues();
+        int rowSize = sourceValues.size();
+        int colSize = sourceValues.get(0).size();
+
+        for (int row = 0; row < rowSize; row++) {
+            for (int col = 0; col < colSize; col++) {
+                Object sourceColValue = sourceValues.get(row).get(col);
+                Object sinkColValue = sinkValues.get(row).get(col);
+
+                if (!Objects.deepEquals(sourceColValue, sinkColValue)) {
+                    Assertions.assertEquals(sourceColValue, sinkColValue);
+                }
+            }
+        }
+    }
+
     private void initializeInfluxDBClient() throws ConnectException {
         InfluxDBConfig influxDBConfig = new InfluxDBConfig(influxDBConnectUrl);
         influxDB = InfluxDBClient.getInfluxDB(influxDBConfig);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf
new file mode 100644
index 0000000000..4b7666130d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  InfluxDB {
+    url = "http://influxdb-host:8086";
+    sql = "select label, c_string, c_double, c_bigint, c_float, c_int, 
c_smallint, c_boolean from source tz('Asia/Shanghai')"
+    database = "test"
+    schema {
+      fields {
+        label = STRING
+        c_string = STRING
+        c_double = DOUBLE
+        c_bigint = BIGINT
+        c_float = FLOAT
+        c_int = INT
+        c_smallint = SMALLINT
+        c_boolean = BOOLEAN
+        time = BIGINT
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  InfluxDB {
+    url = "http://influxdb-host:8086";
+    database = "test"
+    measurement = "sink"
+    key_time = "time"
+    key_tags = ["label"]
+    batch_size = 1
+  }
+}
\ No newline at end of file

Reply via email to