Hisoka-X commented on code in PR #8086: URL: https://github.com/apache/seatunnel/pull/8086#discussion_r1849631949
########## seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/CreateTableParser.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.util; + +import lombok.Getter; + +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class CreateTableParser { + + private static final Pattern COLUMN_PATTERN = Pattern.compile("`?(\\w+)`?\\s*([\\w|\\W]*)"); Review Comment: ditto ########## seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.util; + +import org.apache.seatunnel.api.sink.SaveModePlaceHolder; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.catalog.ClickhouseTypeConverter; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig; +import org.apache.seatunnel.connectors.seatunnel.common.sql.template.SqlTemplate; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class ClickhouseCatalogUtil { + + public static String getCreateTableSql( + String template, String database, String table, TableSchema tableSchema) { + String primaryKey = ""; + if (tableSchema.getPrimaryKey() != null) { + primaryKey = + tableSchema.getPrimaryKey().getColumnNames().stream() + .map(r -> "`" + r + "`") + .collect(Collectors.joining(",")); + } + String uniqueKey = ""; + if (!tableSchema.getConstraintKeys().isEmpty()) { + uniqueKey = + tableSchema.getConstraintKeys().stream() + .flatMap(c -> c.getColumnNames().stream()) + .map(r -> "`" + r.getColumnName() + "`") + .collect(Collectors.joining(",")); + } + SqlTemplate.canHandledByTemplateWithPlaceholder( + template, + SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder(), + primaryKey, + TablePath.of(database, table).getFullName(), + ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key()); + template = + template.replaceAll( Review Comment: how about put template parse into common module? Not only one connector use same logic. ########## seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java: ########## @@ -63,6 +63,7 @@ public class ClickhouseSinkWriter this.proxy = new ClickhouseProxy(option.getShardMetadata().getDefaultShard().getNode()); this.shardRouter = new ShardRouter(proxy, option.getShardMetadata()); this.statementMap = initStatementMap(); + System.out.println(123); Review Comment: revert. ########## seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseCatalog.java: ########## @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.catalog; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PreviewResult; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.SQLPreviewResult; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseCatalogUtil; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil; + +import org.apache.commons.lang3.StringUtils; + +import com.clickhouse.client.ClickHouseColumn; +import com.clickhouse.client.ClickHouseNode; +import com.google.common.base.Preconditions; +import lombok.extern.slf4j.Slf4j; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME; +import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument; + +@Slf4j +public class ClickhouseCatalog implements Catalog { + + protected String defaultDatabase = "information_schema"; + private ReadonlyConfig readonlyConfig; + private ClickhouseProxy proxy; + private final String template; + + private String catalogName; + + public ClickhouseCatalog(ReadonlyConfig readonlyConfig, String catalogName) { + this.readonlyConfig = readonlyConfig; + this.catalogName = catalogName; + this.template = readonlyConfig.get(SAVE_MODE_CREATE_TEMPLATE); + } + + @Override + public List<String> listDatabases() throws CatalogException { + return proxy.listDatabases(); + } + + @Override + public List<String> listTables(String databaseName) + throws CatalogException, DatabaseNotExistException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(this.catalogName, databaseName); + } + + return proxy.listTable(databaseName); + } + + @Override + public CatalogTable getTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(catalogName, tablePath); + } + List<ClickHouseColumn> clickHouseColumns = + proxy.getClickHouseColumns(tablePath.getFullNameWithQuoted()); + + try { + Optional<PrimaryKey> primaryKey = + proxy.getPrimaryKey(tablePath.getDatabaseName(), tablePath.getTableName()); + + TableSchema.Builder builder = TableSchema.builder(); + primaryKey.ifPresent(builder::primaryKey); + buildColumnsWithErrorCheck( + tablePath, + builder, + clickHouseColumns.iterator(), + column -> + PhysicalColumn.of( + column.getColumnName(), + TypeConvertUtil.convert(column), + (long) column.getEstimatedLength(), + column.getScale(), + column.isNullable(), + null, + null)); + + TableIdentifier tableIdentifier = + TableIdentifier.of( + catalogName, tablePath.getDatabaseName(), tablePath.getTableName()); + return CatalogTable.of( + tableIdentifier, + builder.build(), + buildConnectorOptions(tablePath), + Collections.emptyList(), + ""); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed getting table %s", tablePath.getFullName()), e); + } + } + + @Override + public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + log.debug("Create table :{}.{}", tablePath.getDatabaseName(), tablePath.getTableName()); + proxy.createTable( + tablePath.getDatabaseName(), + tablePath.getTableName(), + template, + table.getTableSchema()); + } + + @Override + public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + proxy.dropTable(tablePath.getDatabaseName(), tablePath.getTableName()); + } + + @Override + public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { + if (tableExists(tablePath)) { + proxy.truncateTable(tablePath.getDatabaseName(), tablePath.getTableName()); + } + } catch (Exception e) { + throw new CatalogException("Truncate table failed", e); + } + } + + @Override + public void executeSql(TablePath tablePath, String sql) { + try { + proxy.executeSql(sql); + } catch (Exception e) { + throw new CatalogException(String.format("Failed EXECUTE SQL in catalog %s", sql), e); + } + } + + @Override + public boolean isExistsData(TablePath tablePath) { + try { + return proxy.isExistsData(tablePath.getFullName()); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void createDatabase(TablePath tablePath, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + proxy.createDatabase(tablePath.getDatabaseName()); + } + + @Override + public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + proxy.dropDatabase(tablePath.getDatabaseName()); + } + + @SuppressWarnings("MagicNumber") + private Map<String, String> buildConnectorOptions(TablePath tablePath) { + Map<String, String> options = new HashMap<>(8); + options.put("connector", "clickhouse"); + options.put("host", readonlyConfig.get(HOST)); + options.put("database", tablePath.getDatabaseName()); + options.put("username", readonlyConfig.get(USERNAME)); + options.put("password", readonlyConfig.get(PASSWORD)); Review Comment: please do not put username/password into options. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
