eskabetxe commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/49#discussion_r1209929711


##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java:
##########
@@ -291,4 +291,12 @@ protected <T> T getNullable(ResultSet rs, 
FunctionWithException<ResultSet, T, SQ
     protected <T> T getNullable(ResultSet rs, T value) throws SQLException {
         return rs.wasNull() ? null : value;
     }
+
+    public TableField[] getFields() {
+        return fields;
+    }
+
+    public String getName() {

Review Comment:
   already exist a getTableName()



##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/table/ClickhouseTableRow.java:
##########
@@ -0,0 +1,35 @@
+package org.apache.flink.connector.jdbc.databases.clickhouse.table;
+
+import org.apache.flink.connector.jdbc.testutils.tables.TableField;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** ClickhouseTableRow . */
+public class ClickhouseTableRow extends TableRow {
+
+    public ClickhouseTableRow(String name, TableField[] fields) {
+        super(name, fields);
+    }
+
+    @Override
+    public String getCreateQuery() {
+        return String.format(
+                "CREATE TABLE %s (%s) %s PRIMARY KEY (%s)",
+                getTableName(),
+                
getStreamFields().map(TableField::asString).collect(Collectors.joining(", ")),
+                "ENGINE = MergeTree",
+                getStreamFields().map(TableField::getName).findFirst().get());

Review Comment:
   you are ignoring the PKs defined in table definition with this...



##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/table/ClickhouseTableRow.java:
##########
@@ -0,0 +1,35 @@
+package org.apache.flink.connector.jdbc.databases.clickhouse.table;
+
+import org.apache.flink.connector.jdbc.testutils.tables.TableField;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** ClickhouseTableRow . */
+public class ClickhouseTableRow extends TableRow {
+
+    public ClickhouseTableRow(String name, TableField[] fields) {
+        super(name, fields);
+    }
+
+    @Override
+    public String getCreateQuery() {
+        return String.format(
+                "CREATE TABLE %s (%s) %s PRIMARY KEY (%s)",
+                getTableName(),
+                
getStreamFields().map(TableField::asString).collect(Collectors.joining(", ")),

Review Comment:
   as this is also used on TableBase could we create a method for it there?



##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java:
##########
@@ -291,4 +291,12 @@ protected <T> T getNullable(ResultSet rs, 
FunctionWithException<ResultSet, T, SQ
     protected <T> T getNullable(ResultSet rs, T value) throws SQLException {
         return rs.wasNull() ? null : value;
     }
+
+    public TableField[] getFields() {

Review Comment:
   for the use I see you should use getStreamFields() 



##########
flink-connector-jdbc/pom.xml:
##########
@@ -194,6 +194,22 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <!-- clickhouse tests -->
+               <dependency>
+                       <groupId>com.clickhouse</groupId>
+                       <artifactId>clickhouse-jdbc</artifactId>
+                       <version>0.4.6</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>clickhouse</artifactId>
+                       <version>1.18.1</version>

Review Comment:
   should be ${testcontainers.version}



##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java:
##########
@@ -322,6 +325,9 @@ void testBatchSink() throws Exception {
 
     @Test
     void testReadingFromChangelogSource() throws Exception {
+        if (!getMetadata().supportUpsert()) {

Review Comment:
   Here the problem is the deletion no? (I have this same problem in Trino 
dialect)
   



##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/clickhouse/table/ClickhouseTableRow.java:
##########
@@ -0,0 +1,35 @@
+package org.apache.flink.connector.jdbc.databases.clickhouse.table;
+
+import org.apache.flink.connector.jdbc.testutils.tables.TableField;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** ClickhouseTableRow . */
+public class ClickhouseTableRow extends TableRow {
+
+    public ClickhouseTableRow(String name, TableField[] fields) {
+        super(name, fields);
+    }
+
+    @Override
+    public String getCreateQuery() {
+        return String.format(
+                "CREATE TABLE %s (%s) %s PRIMARY KEY (%s)",
+                getTableName(),
+                
getStreamFields().map(TableField::asString).collect(Collectors.joining(", ")),
+                "ENGINE = MergeTree",
+                getStreamFields().map(TableField::getName).findFirst().get());
+    }
+
+    @Override
+    protected String getDeleteFromQuery() {
+        return String.format("truncate table %s", getTableName());
+    }
+
+    private Stream<TableField> getStreamFields() {

Review Comment:
   TableBase already have a getStreamFields()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to