This is an automated email from the ASF dual-hosted git repository. gaojun2048 pushed a commit to branch revert-4477-delete-catalog in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
commit 5f106e2cdb56ea59417b3e19a2975a3da9eff9b2 Author: Eric <[email protected]> AuthorDate: Thu Apr 13 18:44:45 2023 +0800 Revert "[chore] delete unavailable S3 & Kafka Catalogs (#4477)" This reverts commit e0aec5ecec3dac6d0b1f32b0feab763d36322aca. --- .../seatunnel/file/s3/catalog/S3Catalog.java | 200 ++++++++++++++++ .../file/s3/catalog/S3CatalogFactory.java | 48 ++++ .../file/s3/catalog/S3DataTypeConvertor.java | 63 +++++ .../seatunnel/kafka/catalog/KafkaCatalog.java | 265 +++++++++++++++++++++ .../kafka/catalog/KafkaCatalogFactory.java | 43 ++++ .../kafka/catalog/KafkaDataTypeConvertor.java | 72 ++++++ .../kafka/catalog/KafkaDataTypeConvertorTest.java | 46 ++++ 7 files changed, 737 insertions(+) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java new file mode 100644 index 000000000..816597f05 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.s3.catalog; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; +import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf; +import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Config; +import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy; +import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory; + +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileSystem; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.apache.seatunnel.shade.hadoop.com.google.common.base.Preconditions.checkNotNull; + +/** + * S3 catalog implementation. + * + * <p>The given path directory is the database/table. + */ +public class S3Catalog implements Catalog { + + private static final Logger LOGGER = LoggerFactory.getLogger(S3Catalog.class); + + private final String catalogName; + private final Config s3Config; + + private String defaultDatabase; + private FileSystem fileSystem; + + public S3Catalog(String catalogName, Config s3Config) { + this.catalogName = checkNotNull(catalogName, "catalogName cannot be null"); + this.s3Config = checkNotNull(s3Config, "s3Config cannot be null"); + } + + @Override + public void open() throws CatalogException { + ReadStrategy readStrategy = + ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_FORMAT_TYPE.key())); + readStrategy.setPluginConfig(s3Config); + this.defaultDatabase = s3Config.getString(S3Config.FILE_PATH.key()); + readStrategy = ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_FORMAT_TYPE.key())); + readStrategy.setPluginConfig(s3Config); + try { + fileSystem = + FileSystem.get(readStrategy.getConfiguration(S3Conf.buildWithConfig(s3Config))); + } catch (IOException e) { + throw new CatalogException("Open S3Catalog failed", e); + } + LOGGER.info("S3Catalog {} is opened", catalogName); + } + + @Override + public void close() throws CatalogException { + LOGGER.info("S3Catalog {} is closed", catalogName); + } + + @Override + public String getDefaultDatabase() throws CatalogException { + return defaultDatabase; + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + // check if the directory exists + try { + return fileSystem + .getFileStatus(new org.apache.hadoop.fs.Path(databaseName)) + .isDirectory(); + } catch (FileNotFoundException e) { + LOGGER.debug("Database {} does not exist", databaseName, e); + return false; + } catch (Exception ex) { + throw new CatalogException("Check database exists failed", ex); + } + } + + @Override + public List<String> listDatabases() throws CatalogException { + // todo: Do we need to find all sub directory as database? + if (databaseExists(defaultDatabase)) { + return Lists.newArrayList(defaultDatabase); + } + return Collections.emptyList(); + } + + @Override + public List<String> listTables(String databaseName) + throws CatalogException, DatabaseNotExistException { + if (databaseExists(databaseName)) { + return Lists.newArrayList(databaseName); + } + return Collections.emptyList(); + } + + @Override + public boolean tableExists(TablePath tablePath) throws CatalogException { + checkNotNull(tablePath, "tablePath cannot be null"); + return databaseExists(tablePath.getTableName()); + } + + @Override + public CatalogTable getTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + checkNotNull(tablePath, "tablePath cannot be null"); + TableIdentifier tableIdentifier = + TableIdentifier.of( + catalogName, tablePath.getDatabaseName(), tablePath.getTableName()); + // todo: + TableSchema tableSchema = TableSchema.builder().build(); + return CatalogTable.of( + tableIdentifier, tableSchema, Collections.emptyMap(), Collections.emptyList(), ""); + } + + @Override + public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + createDatabase(tablePath, ignoreIfExists); + } + + @Override + public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + checkNotNull(tablePath, "tablePath cannot be null"); + try { + fileSystem.delete(new org.apache.hadoop.fs.Path(tablePath.getTableName()), true); + } catch (IOException e) { + throw new CatalogException("Drop table failed", e); + } + } + + @Override + public void createDatabase(TablePath tablePath, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + // todo: Do we need to set schema? + checkNotNull(tablePath, "tablePath cannot be null"); + try { + fileSystem.create(new org.apache.hadoop.fs.Path(tablePath.getTableName())); + } catch (FileAlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath, e); + } + } catch (IOException e) { + throw new CatalogException( + String.format( + "Create table %s at catalog %s failed", + tablePath.getTableName(), catalogName), + e); + } + } + + @Override + public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + checkNotNull(tablePath, "tablePath cannot be null"); + try { + fileSystem.delete(new org.apache.hadoop.fs.Path(tablePath.getDatabaseName()), true); + } catch (IOException e) { + throw new CatalogException( + String.format("Drop database: %s failed", tablePath.getDatabaseName()), e); + } + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3CatalogFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3CatalogFactory.java new file mode 100644 index 000000000..569f070e6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3CatalogFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.s3.catalog; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.factory.CatalogFactory; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; + +public class S3CatalogFactory implements CatalogFactory { + + @Override + public Catalog createCatalog(String catalogName, ReadonlyConfig options) { + // todo: + Config config = ConfigFactory.parseMap(options.toMap()); + return new S3Catalog(catalogName, config); + } + + @Override + public String factoryIdentifier() { + return FileSystemType.S3.getFileSystemPluginName(); + } + + @Override + public OptionRule optionRule() { + return null; + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java new file mode 100644 index 000000000..93f175600 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.s3.catalog; + +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.DataTypeConvertException; +import org.apache.seatunnel.api.table.catalog.DataTypeConvertor; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import com.google.auto.service.AutoService; + +import java.util.Map; + +import static com.google.common.base.Preconditions.checkNotNull; + +@AutoService(DataTypeConvertor.class) +public class S3DataTypeConvertor implements DataTypeConvertor<SeaTunnelRowType> { + @Override + public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) { + checkNotNull(connectorDataType, "connectorDataType can not be null"); + return CatalogTableUtil.parseDataType(connectorDataType); + } + + @Override + public SeaTunnelDataType<?> toSeaTunnelType( + SeaTunnelRowType connectorDataType, Map<String, Object> dataTypeProperties) + throws DataTypeConvertException { + return connectorDataType; + } + + @Override + public SeaTunnelRowType toConnectorType( + SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> dataTypeProperties) + throws DataTypeConvertException { + // transform SeaTunnelDataType to SeaTunnelRowType + if (!(seaTunnelDataType instanceof SeaTunnelRowType)) { + throw DataTypeConvertException.convertToConnectorDataTypeException(seaTunnelDataType); + } + return (SeaTunnelRowType) seaTunnelDataType; + } + + @Override + public String getIdentity() { + return "S3"; + } +} diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalog.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalog.java new file mode 100644 index 000000000..d23890d47 --- /dev/null +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalog.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.kafka.catalog; + +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; +import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.DeleteTopicsResult; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartitionInfo; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * This is a KafkaCatalog implementation. + * + * <p>In kafka the database and table both are the topic name. + */ +public class KafkaCatalog implements Catalog { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaCatalog.class); + private final String catalogName; + private final String bootstrapServers; + private final String defaultTopic; + + private AdminClient adminClient; + + public KafkaCatalog(String catalogName, String defaultTopic, String bootstrapServers) { + this.catalogName = checkNotNull(catalogName, "catalogName cannot be null"); + this.bootstrapServers = checkNotNull(bootstrapServers, "bootstrapServers cannot be null"); + this.defaultTopic = checkNotNull(defaultTopic, "defaultTopic cannot be null"); + } + + @Override + public void open() throws CatalogException { + Properties properties = new Properties(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + adminClient = AdminClient.create(properties); + try { + TopicDescription topicDescription = getTopicDescription(defaultTopic); + if (topicDescription == null) { + throw new DatabaseNotExistException(catalogName, defaultTopic); + } + LOGGER.info( + "Catalog {} is established connection to {}, the default database is {}", + catalogName, + bootstrapServers, + topicDescription.name()); + } catch (DatabaseNotExistException e) { + throw e; + } catch (Exception e) { + throw new CatalogException( + String.format( + "Catalog : %s establish connection to %s error", + catalogName, bootstrapServers), + e); + } + } + + @Override + public void close() throws CatalogException { + adminClient.close(); + } + + @Override + public String getDefaultDatabase() throws CatalogException { + return defaultTopic; + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + checkNotNull(databaseName, "databaseName cannot be null"); + try { + TopicDescription topicDescription = getTopicDescription(databaseName); + return topicDescription != null; + } catch (Exception e) { + throw new CatalogException( + String.format( + "Catalog : %s check database : %s exists error", + catalogName, databaseName), + e); + } + } + + @Override + public List<String> listDatabases() throws CatalogException { + try { + ListTopicsResult listTopicsResult = adminClient.listTopics(); + Set<String> topics = listTopicsResult.names().get(); + return Lists.newArrayList(topics); + } catch (InterruptedException | ExecutionException e) { + throw new CatalogException( + String.format("Listing database in catalog %s error", catalogName), e); + } + } + + @Override + public List<String> listTables(String databaseName) + throws CatalogException, DatabaseNotExistException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(catalogName, databaseName); + } + return Lists.newArrayList(databaseName); + } + + @Override + public boolean tableExists(TablePath tablePath) throws CatalogException { + checkNotNull(tablePath, "tablePath cannot be null"); + return databaseExists(tablePath.getDatabaseName()); + } + + @Override + public CatalogTable getTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + checkNotNull(tablePath, "tablePath cannot be null"); + TopicDescription topicDescription; + try { + topicDescription = getTopicDescription(tablePath.getTableName()); + if (topicDescription == null) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (ExecutionException | InterruptedException e) { + throw new CatalogException( + String.format("Catalog : %s get table : %s error", catalogName, tablePath), e); + } + TableIdentifier tableIdentifier = + TableIdentifier.of( + catalogName, tablePath.getDatabaseName(), tablePath.getTableName()); + // todo: Set the schema of the table? + TableSchema tableSchema = TableSchema.builder().build(); + return CatalogTable.of( + tableIdentifier, + tableSchema, + buildConnectorOptions(topicDescription), + Collections.emptyList(), + ""); + } + + @Override + public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + checkNotNull(tablePath, "tablePath cannot be null"); + if (tableExists(tablePath)) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + Map<String, String> options = table.getOptions(); + int partitionNumber = Integer.parseInt(options.get(Config.PARTITION.key())); + short replicationFactor = Short.parseShort(options.get(Config.REPLICATION_FACTOR)); + NewTopic newTopic = + new NewTopic(tablePath.getTableName(), partitionNumber, replicationFactor); + CreateTopicsResult createTopicsResult = + adminClient.createTopics(Lists.newArrayList(newTopic)); + try { + createTopicsResult.all().get(); + } catch (ExecutionException | InterruptedException e) { + throw new CatalogException( + String.format( + "Catalog : %s create table : %s error", + catalogName, tablePath.getFullName()), + e); + } + } + + @Override + public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(catalogName, tablePath); + } + DeleteTopicsResult deleteTopicsResult = + adminClient.deleteTopics(Lists.newArrayList(tablePath.getTableName())); + try { + deleteTopicsResult.all().get(); + } catch (InterruptedException | ExecutionException e) { + throw new CatalogException( + String.format( + "Catalog : %s drop table : %s error", + catalogName, tablePath.getFullName()), + e); + } + } + + @Override + public void createDatabase(TablePath tablePath, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + // todo: We cannot create topic here, since we don't know the partition number and + // replication factor. + throw new UnsupportedOperationException("Kafka catalog does not support create database"); + } + + @Override + public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + // todo: + dropTable(tablePath, ignoreIfNotExists); + } + + private TopicDescription getTopicDescription(String topicName) + throws ExecutionException, InterruptedException { + DescribeTopicsResult describeTopicsResult = + adminClient.describeTopics(Lists.newArrayList(topicName)); + KafkaFuture<TopicDescription> topicDescriptionKafkaFuture = + describeTopicsResult.topicNameValues().get(topicName); + return topicDescriptionKafkaFuture.get(); + } + + private Map<String, String> buildConnectorOptions(TopicDescription topicDescription) { + String topicName = topicDescription.name(); + List<TopicPartitionInfo> partitions = topicDescription.partitions(); + List<Node> replicas = partitions.get(0).replicas(); + // todo: Do we need to support partition has different replication factor? + Map<String, String> options = new HashMap<>(); + options.put(Config.BOOTSTRAP_SERVERS.key(), bootstrapServers); + options.put(Config.TOPIC.key(), topicName); + options.put(Config.PARTITION.key(), String.valueOf(partitions.size())); + options.put(Config.REPLICATION_FACTOR, String.valueOf(replicas.size())); + return options; + } +} diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalogFactory.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalogFactory.java new file mode 100644 index 000000000..a30e459d0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalogFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.kafka.catalog; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.factory.CatalogFactory; + +public class KafkaCatalogFactory implements CatalogFactory { + + @Override + public Catalog createCatalog(String catalogName, ReadonlyConfig options) { + // todo: Do we need to use singleton here? + return null; + } + + @Override + public String factoryIdentifier() { + return org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONNECTOR_IDENTITY; + } + + @Override + public OptionRule optionRule() { + // todo: + return null; + } +} diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java new file mode 100644 index 000000000..20b4801ca --- /dev/null +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.kafka.catalog; + +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.DataTypeConvertException; +import org.apache.seatunnel.api.table.catalog.DataTypeConvertor; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; + +import com.google.auto.service.AutoService; + +import java.util.Map; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The data type convertor of Kafka, only fields defined in schema has the type. e.g. + * + * <pre> + * schema = { + * fields { + * name = "string" + * age = "int" + * } + * } + * </pre> + * + * <p>Right now the data type of kafka is SeaTunnelType, so we don't need to convert the data type. + */ +@AutoService(DataTypeConvertor.class) +public class KafkaDataTypeConvertor implements DataTypeConvertor<SeaTunnelDataType<?>> { + + @Override + public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) { + checkNotNull(connectorDataType, "connectorDataType can not be null"); + return CatalogTableUtil.parseDataType(connectorDataType); + } + + @Override + public SeaTunnelDataType<?> toSeaTunnelType( + SeaTunnelDataType<?> connectorDataType, Map<String, Object> dataTypeProperties) + throws DataTypeConvertException { + return connectorDataType; + } + + @Override + public SeaTunnelDataType<?> toConnectorType( + SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> dataTypeProperties) + throws DataTypeConvertException { + return seaTunnelDataType; + } + + @Override + public String getIdentity() { + return "Kafka"; + } +} diff --git a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertorTest.java b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertorTest.java new file mode 100644 index 000000000..c8f60025b --- /dev/null +++ b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertorTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.kafka.catalog; + +import org.apache.seatunnel.api.table.type.BasicType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +class KafkaDataTypeConvertorTest { + + private final KafkaDataTypeConvertor kafkaDataTypeConvertor = new KafkaDataTypeConvertor(); + + @Test + void toSeaTunnelType() { + Assertions.assertEquals( + BasicType.STRING_TYPE, + kafkaDataTypeConvertor.toConnectorType( + BasicType.STRING_TYPE, Collections.emptyMap())); + } + + @Test + void toConnectorType() { + Assertions.assertEquals( + BasicType.STRING_TYPE, + kafkaDataTypeConvertor.toConnectorType( + BasicType.STRING_TYPE, Collections.emptyMap())); + } +}
