This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 7d7f50654 Add S3Catalog (#4121)
7d7f50654 is described below

commit 7d7f506547346c48258d0148fbae684d9074bb0d
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Feb 14 11:22:10 2023 +0800

    Add S3Catalog (#4121)
---
 .../seatunnel/file/s3/catalog/S3Catalog.java       | 186 +++++++++++++++++++++
 .../file/s3/catalog/S3CatalogFactory.java          |  48 ++++++
 .../file/s3/catalog/S3DataTypeConvertor.java       |  60 +++++++
 3 files changed, 294 insertions(+)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
new file mode 100644
index 000000000..b8cc5aa00
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.s3.catalog;
+
+import static 
org.apache.seatunnel.shade.hadoop.com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
+import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Config;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * S3 catalog implementation.
+ * <p>The given path directory is the database/table.
+ */
+public class S3Catalog implements Catalog {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(S3Catalog.class);
+
+    private final String catalogName;
+    private final Config s3Config;
+
+    private String defaultDatabase;
+    private FileSystem fileSystem;
+
+    public S3Catalog(String catalogName, Config s3Config) {
+        this.catalogName = checkNotNull(catalogName, "catalogName cannot be 
null");
+        this.s3Config = checkNotNull(s3Config, "s3Config cannot be null");
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        ReadStrategy readStrategy = 
ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_TYPE.key()));
+        readStrategy.setPluginConfig(s3Config);
+        this.defaultDatabase = s3Config.getString(S3Config.FILE_PATH.key());
+        readStrategy = 
ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_TYPE.key()));
+        readStrategy.setPluginConfig(s3Config);
+        try {
+            fileSystem = 
FileSystem.get(readStrategy.getConfiguration(S3Conf.buildWithConfig(s3Config)));
+        } catch (IOException e) {
+            throw new CatalogException("Open S3Catalog failed", e);
+        }
+        LOGGER.info("S3Catalog {} is opened", catalogName);
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        LOGGER.info("S3Catalog {} is closed", catalogName);
+    }
+
+    @Override
+    public String getDefaultDatabase() throws CatalogException {
+        return defaultDatabase;
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        // check if the directory exists
+        try {
+            return fileSystem.getFileStatus(new 
org.apache.hadoop.fs.Path(databaseName)).isDirectory();
+        } catch (FileNotFoundException e) {
+            LOGGER.debug("Database {} does not exist", databaseName, e);
+            return false;
+        } catch (Exception ex) {
+            throw new CatalogException("Check database exists failed", ex);
+        }
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        // todo: Do we need to find all sub directory as database?
+        if (databaseExists(defaultDatabase)) {
+            return Lists.newArrayList(defaultDatabase);
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<String> listTables(String databaseName) throws 
CatalogException, DatabaseNotExistException {
+        if (databaseExists(databaseName)) {
+            return Lists.newArrayList(databaseName);
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        checkNotNull(tablePath, "tablePath cannot be null");
+        return databaseExists(tablePath.getTableName());
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath) throws CatalogException, 
TableNotExistException {
+        checkNotNull(tablePath, "tablePath cannot be null");
+        TableIdentifier tableIdentifier = TableIdentifier.of(catalogName, 
tablePath.getDatabaseName(), tablePath.getTableName());
+        // todo:
+        TableSchema tableSchema = TableSchema.builder()
+            .build();
+        return CatalogTable.of(
+            tableIdentifier,
+            tableSchema,
+            Collections.emptyMap(),
+            Collections.emptyList(),
+            "");
+    }
+
+    @Override
+    public void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        createDatabase(tablePath, ignoreIfExists);
+    }
+
+    @Override
+    public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) 
throws TableNotExistException, CatalogException {
+        checkNotNull(tablePath, "tablePath cannot be null");
+        try {
+            fileSystem.delete(new 
org.apache.hadoop.fs.Path(tablePath.getTableName()), true);
+        } catch (IOException e) {
+            throw new CatalogException("Drop table failed", e);
+        }
+    }
+
+    @Override
+    public void createDatabase(TablePath tablePath, boolean ignoreIfExists) 
throws DatabaseAlreadyExistException, CatalogException {
+        // todo: Do we need to set schema?
+        checkNotNull(tablePath, "tablePath cannot be null");
+        try {
+            fileSystem.create(new 
org.apache.hadoop.fs.Path(tablePath.getTableName()));
+        } catch (FileAlreadyExistsException e) {
+            if (!ignoreIfExists) {
+                throw new TableAlreadyExistException(catalogName, tablePath, 
e);
+            }
+        } catch (IOException e) {
+            throw new CatalogException(String.format("Create table %s at 
catalog %s failed", tablePath.getTableName(), catalogName), e);
+        }
+    }
+
+    @Override
+    public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) 
throws DatabaseNotExistException, CatalogException {
+        checkNotNull(tablePath, "tablePath cannot be null");
+        try {
+            fileSystem.delete(new 
org.apache.hadoop.fs.Path(tablePath.getDatabaseName()), true);
+        } catch (IOException e) {
+            throw new CatalogException(String.format("Drop database: %s 
failed", tablePath.getDatabaseName()), e);
+        }
+    }
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3CatalogFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3CatalogFactory.java
new file mode 100644
index 000000000..f0d8d8cc6
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3CatalogFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.s3.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+public class S3CatalogFactory implements CatalogFactory {
+
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        // todo:
+        Config config = ConfigFactory.parseMap(options.toMap());
+        return new S3Catalog(catalogName, config);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return FileSystemType.S3.getFileSystemPluginName();
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return null;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
new file mode 100644
index 000000000..d123ac66d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.s3.catalog;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Map;
+
+@AutoService(DataTypeConvertor.class)
+public class S3DataTypeConvertor implements 
DataTypeConvertor<SeaTunnelRowType> {
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+        checkNotNull(connectorDataType, "connectorDataType can not be null");
+        return SeaTunnelSchema.parseTypeByString(connectorDataType);
+    }
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(SeaTunnelRowType 
connectorDataType, Map<String, Object> dataTypeProperties) throws 
DataTypeConvertException {
+        return connectorDataType;
+    }
+
+    @Override
+    public SeaTunnelRowType toConnectorType(SeaTunnelDataType<?> 
seaTunnelDataType, Map<String, Object> dataTypeProperties) throws 
DataTypeConvertException {
+        // transform SeaTunnelDataType to SeaTunnelRowType
+        if (!(seaTunnelDataType instanceof SeaTunnelRowType)) {
+            throw 
DataTypeConvertException.convertToConnectorDataTypeException(seaTunnelDataType);
+        }
+        return (SeaTunnelRowType) seaTunnelDataType;
+    }
+
+    @Override
+    public String getIdentity() {
+        return FileSystemType.S3.getFileSystemPluginName();
+    }
+}

Reply via email to