This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9ad11d899 [INLONG-6594][Sort] Fix ClickHouse connector throw exception
when source is change log stream (#6595)
9ad11d899 is described below
commit 9ad11d8996d9148c932d7f08a79c7a77911f7cd5
Author: Xin Gong <[email protected]>
AuthorDate: Mon Nov 28 11:52:52 2022 +0800
[INLONG-6594][Sort] Fix ClickHouse connector throw exception when source is
change log stream (#6595)
---
.../sort/jdbc/dialect/ClickHouseDialect.java | 95 +++++++++++++++++++++-
.../sort/parser/ClickHouseSqlParserTest.java | 2 +-
2 files changed, 94 insertions(+), 3 deletions(-)
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java
index b9d7f48b0..016f994c1 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java
@@ -18,11 +18,14 @@
package org.apache.inlong.sort.jdbc.dialect;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.inlong.sort.jdbc.converter.clickhouse.ClickHouseRowConverter;
import org.apache.inlong.sort.jdbc.table.AbstractJdbcDialect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
@@ -36,6 +39,8 @@ import static java.lang.String.format;
*/
public class ClickHouseDialect extends AbstractJdbcDialect {
+ public static final Logger LOG =
LoggerFactory.getLogger(ClickHouseDialect.class);
+
// Define MAX/MIN precision of TIMESTAMP type according to ClickHouse docs:
// https://clickhouse.com/docs/zh/sql-reference/data-types/datetime64
private static final int MAX_TIMESTAMP_PRECISION = 8;
@@ -45,6 +50,7 @@ public class ClickHouseDialect extends AbstractJdbcDialect {
// https://clickhouse.com/docs/zh/sql-reference/data-types/decimal/
private static final int MAX_DECIMAL_PRECISION = 128;
private static final int MIN_DECIMAL_PRECISION = 32;
+ private static final String POINT = ".";
@Override
public String dialectName() {
@@ -129,22 +135,46 @@ public class ClickHouseDialect extends
AbstractJdbcDialect {
@Override
public String getUpdateStatement(
String tableName, String[] fieldNames, String[] conditionFields) {
+ List<String> conditionFieldList = Arrays.asList(conditionFields);
String setClause =
Arrays.stream(fieldNames)
+ .filter(fieldName ->
!conditionFieldList.contains(fieldName))
.map(f -> format("%s = :%s", quoteIdentifier(f), f))
.collect(Collectors.joining(", "));
+
String conditionClause =
Arrays.stream(conditionFields)
.map(f -> format("%s = :%s", quoteIdentifier(f), f))
.collect(Collectors.joining(" AND "));
+ Pair<String, String> databaseAndTableName =
getDatabaseAndTableName(tableName);
return "ALTER TABLE "
- + quoteIdentifier(tableName)
+ + quoteIdentifier(databaseAndTableName.getLeft())
+ + POINT
+ + quoteIdentifier(databaseAndTableName.getRight())
+ " UPDATE "
+ setClause
+ " WHERE "
+ conditionClause;
}
+ /**
+ * ClickHouse throw exception "Table default.test_user doesn't exist". But
jdbc-url have database name.
+ * So we specify database when exec query. This method parse tableName to
database and table.
+ * @param tableName include database.table
+ * @return pair left is database, right is table
+ */
+ private Pair<String, String> getDatabaseAndTableName(String tableName) {
+ String databaseName = "default";
+ if (tableName.contains(POINT)) {
+ String[] tableNameArray = tableName.split("\\.");
+ databaseName = tableNameArray[0];
+ tableName = tableNameArray[1];
+ } else {
+ LOG.warn("TableName doesn't include database name, so using
default as database name");
+ }
+ return Pair.of(databaseName, tableName);
+ }
+
/**
* Get delete one row statement by condition fields
*/
@@ -154,6 +184,67 @@ public class ClickHouseDialect extends AbstractJdbcDialect
{
Arrays.stream(conditionFields)
.map(f -> format("%s = :%s", quoteIdentifier(f), f))
.collect(Collectors.joining(" AND "));
- return "ALTER TABLE " + quoteIdentifier(tableName) + " DELETE WHERE "
+ conditionClause;
+ Pair<String, String> databaseAndTableName =
getDatabaseAndTableName(tableName);
+ return "ALTER TABLE "
+ + quoteIdentifier(databaseAndTableName.getLeft())
+ + POINT
+ + quoteIdentifier(databaseAndTableName.getRight())
+ + " DELETE WHERE " + conditionClause;
+ }
+
+ @Override
+ public String getInsertIntoStatement(String tableName, String[]
fieldNames) {
+ String columns =
+ Arrays.stream(fieldNames)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(", "));
+ String placeholders =
+ Arrays.stream(fieldNames).map(f -> ":" +
f).collect(Collectors.joining(", "));
+ Pair<String, String> databaseAndTableName =
getDatabaseAndTableName(tableName);
+ return "INSERT INTO "
+ + quoteIdentifier(databaseAndTableName.getLeft())
+ + POINT
+ + quoteIdentifier(databaseAndTableName.getRight())
+ + "("
+ + columns
+ + ")"
+ + " VALUES ("
+ + placeholders
+ + ")";
+ }
+
+ @Override
+ public String getSelectFromStatement(
+ String tableName, String[] selectFields, String[] conditionFields)
{
+ String selectExpressions =
+ Arrays.stream(selectFields)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(", "));
+ String fieldExpressions =
+ Arrays.stream(conditionFields)
+ .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+ .collect(Collectors.joining(" AND "));
+ Pair<String, String> databaseAndTableName =
getDatabaseAndTableName(tableName);
+ return "SELECT "
+ + selectExpressions
+ + " FROM "
+ + quoteIdentifier(databaseAndTableName.getLeft())
+ + POINT
+ + quoteIdentifier(databaseAndTableName.getRight())
+ + (conditionFields.length > 0 ? " WHERE " + fieldExpressions :
"");
+ }
+
+ @Override
+ public String getRowExistsStatement(String tableName, String[]
conditionFields) {
+ String fieldExpressions =
+ Arrays.stream(conditionFields)
+ .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+ .collect(Collectors.joining(" AND "));
+ Pair<String, String> pair = getDatabaseAndTableName(tableName);
+ return "SELECT 1 FROM "
+ + quoteIdentifier(pair.getLeft())
+ + POINT
+ + quoteIdentifier(pair.getRight())
+ + " WHERE " + fieldExpressions;
}
}
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
index ac35d491e..09e050e0e 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
@@ -77,7 +77,7 @@ public class ClickHouseSqlParserTest {
null,
1,
null,
- "ck_demo",
+ "demo.ck_demo",
"jdbc:clickhouse://localhost:8123/demo",
"default",
"",