This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 7d7f50654 Add S3Catalog (#4121)
7d7f50654 is described below
commit 7d7f506547346c48258d0148fbae684d9074bb0d
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Feb 14 11:22:10 2023 +0800
Add S3Catalog (#4121)
---
.../seatunnel/file/s3/catalog/S3Catalog.java | 186 +++++++++++++++++++++
.../file/s3/catalog/S3CatalogFactory.java | 48 ++++++
.../file/s3/catalog/S3DataTypeConvertor.java | 60 +++++++
3 files changed, 294 insertions(+)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
new file mode 100644
index 000000000..b8cc5aa00
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.s3.catalog;
+
+import static
org.apache.seatunnel.shade.hadoop.com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
+import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Config;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * S3 catalog implementation.
+ * <p>The given path directory is the database/table.
+ */
+public class S3Catalog implements Catalog {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(S3Catalog.class);
+
+ private final String catalogName;
+ private final Config s3Config;
+
+ private String defaultDatabase;
+ private FileSystem fileSystem;
+
+ public S3Catalog(String catalogName, Config s3Config) {
+ this.catalogName = checkNotNull(catalogName, "catalogName cannot be
null");
+ this.s3Config = checkNotNull(s3Config, "s3Config cannot be null");
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ ReadStrategy readStrategy =
ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_TYPE.key()));
+ readStrategy.setPluginConfig(s3Config);
+ this.defaultDatabase = s3Config.getString(S3Config.FILE_PATH.key());
+ readStrategy =
ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_TYPE.key()));
+ readStrategy.setPluginConfig(s3Config);
+ try {
+ fileSystem =
FileSystem.get(readStrategy.getConfiguration(S3Conf.buildWithConfig(s3Config)));
+ } catch (IOException e) {
+ throw new CatalogException("Open S3Catalog failed", e);
+ }
+ LOGGER.info("S3Catalog {} is opened", catalogName);
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ LOGGER.info("S3Catalog {} is closed", catalogName);
+ }
+
+ @Override
+ public String getDefaultDatabase() throws CatalogException {
+ return defaultDatabase;
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ // check if the directory exists
+ try {
+ return fileSystem.getFileStatus(new
org.apache.hadoop.fs.Path(databaseName)).isDirectory();
+ } catch (FileNotFoundException e) {
+ LOGGER.debug("Database {} does not exist", databaseName, e);
+ return false;
+ } catch (Exception ex) {
+ throw new CatalogException("Check database exists failed", ex);
+ }
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ // todo: Do we need to find all sub directory as database?
+ if (databaseExists(defaultDatabase)) {
+ return Lists.newArrayList(defaultDatabase);
+ }
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<String> listTables(String databaseName) throws
CatalogException, DatabaseNotExistException {
+ if (databaseExists(databaseName)) {
+ return Lists.newArrayList(databaseName);
+ }
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ return databaseExists(tablePath.getTableName());
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath) throws CatalogException,
TableNotExistException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ TableIdentifier tableIdentifier = TableIdentifier.of(catalogName,
tablePath.getDatabaseName(), tablePath.getTableName());
+ // todo:
+ TableSchema tableSchema = TableSchema.builder()
+ .build();
+ return CatalogTable.of(
+ tableIdentifier,
+ tableSchema,
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "");
+ }
+
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable table, boolean
ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ createDatabase(tablePath, ignoreIfExists);
+ }
+
+ @Override
+ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ try {
+ fileSystem.delete(new
org.apache.hadoop.fs.Path(tablePath.getTableName()), true);
+ } catch (IOException e) {
+ throw new CatalogException("Drop table failed", e);
+ }
+ }
+
+ @Override
+ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
+ // todo: Do we need to set schema?
+ checkNotNull(tablePath, "tablePath cannot be null");
+ try {
+ fileSystem.create(new
org.apache.hadoop.fs.Path(tablePath.getTableName()));
+ } catch (FileAlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new TableAlreadyExistException(catalogName, tablePath,
e);
+ }
+ } catch (IOException e) {
+ throw new CatalogException(String.format("Create table %s at
catalog %s failed", tablePath.getTableName(), catalogName), e);
+ }
+ }
+
+ @Override
+ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ try {
+ fileSystem.delete(new
org.apache.hadoop.fs.Path(tablePath.getDatabaseName()), true);
+ } catch (IOException e) {
+ throw new CatalogException(String.format("Drop database: %s
failed", tablePath.getDatabaseName()), e);
+ }
+ }
+
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3CatalogFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3CatalogFactory.java
new file mode 100644
index 000000000..f0d8d8cc6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3CatalogFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.s3.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+public class S3CatalogFactory implements CatalogFactory {
+
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ // todo:
+ Config config = ConfigFactory.parseMap(options.toMap());
+ return new S3Catalog(catalogName, config);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return FileSystemType.S3.getFileSystemPluginName();
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return null;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
new file mode 100644
index 000000000..d123ac66d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.s3.catalog;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Map;
+
+@AutoService(DataTypeConvertor.class)
+public class S3DataTypeConvertor implements
DataTypeConvertor<SeaTunnelRowType> {
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+ checkNotNull(connectorDataType, "connectorDataType can not be null");
+ return SeaTunnelSchema.parseTypeByString(connectorDataType);
+ }
+
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(SeaTunnelRowType
connectorDataType, Map<String, Object> dataTypeProperties) throws
DataTypeConvertException {
+ return connectorDataType;
+ }
+
+ @Override
+ public SeaTunnelRowType toConnectorType(SeaTunnelDataType<?>
seaTunnelDataType, Map<String, Object> dataTypeProperties) throws
DataTypeConvertException {
+ // transform SeaTunnelDataType to SeaTunnelRowType
+ if (!(seaTunnelDataType instanceof SeaTunnelRowType)) {
+ throw
DataTypeConvertException.convertToConnectorDataTypeException(seaTunnelDataType);
+ }
+ return (SeaTunnelRowType) seaTunnelDataType;
+ }
+
+ @Override
+ public String getIdentity() {
+ return FileSystemType.S3.getFileSystemPluginName();
+ }
+}