LadyForest commented on code in PR #173:
URL: https://github.com/apache/flink-table-store/pull/173#discussion_r905058925
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java:
##########
@@ -16,39 +16,192 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.catalog;
+package org.apache.flink.table.store.connector;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.store.file.catalog.Catalog;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static
org.apache.flink.table.store.connector.FlinkCatalogFactory.IDENTIFIER;
+import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
/** Catalog for table store. */
-public abstract class TableStoreCatalog extends AbstractCatalog {
+public class FlinkCatalog extends AbstractCatalog {
+
+ private final Catalog catalog;
- public TableStoreCatalog(String name, String defaultDatabase) {
+ public FlinkCatalog(Catalog catalog, String name, String defaultDatabase) {
super(name, defaultDatabase);
+ this.catalog = catalog;
+ }
+
+ @VisibleForTesting
+ Catalog catalog() {
+ return catalog;
+ }
+
+ @Override
+ public Optional<Factory> getFactory() {
+ return Optional.of(
+ FactoryUtil.discoverFactory(classLoader(),
DynamicTableFactory.class, IDENTIFIER));
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return catalog.listDatabases();
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ return true;
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean
cascade)
+ throws DatabaseNotEmptyException, CatalogException {
+ try {
+ catalog.dropDatabase(name, cascade);
+ } catch (Catalog.DatabaseNotEmptyException e) {
+ throw new DatabaseNotEmptyException(getName(), e.database());
+ }
+ }
+
+ @Override
+ public List<String> listTables(String databaseName) throws
CatalogException {
+ return catalog.listTables(databaseName);
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ TableSchema schema;
+ try {
+ schema = catalog.getTable(tablePath);
+ } catch (Catalog.TableNotExistException e) {
+ throw new TableNotExistException(getName(), e.tablePath());
+ }
+
+ CatalogTable table = schema.toUpdateSchema().toCatalogTable();
+ // add path to source and sink
+ table.getOptions().put(PATH.key(),
catalog.getTableLocation(tablePath).toString());
+ return table;
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ return catalog.tableExists(tablePath);
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ try {
+ catalog.dropTable(tablePath, ignoreIfNotExists);
+ } catch (Catalog.TableNotExistException e) {
+ throw new TableNotExistException(getName(), e.tablePath());
+ }
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ try {
+ catalog.createTable(tablePath, convertTableToSchema(tablePath,
table), ignoreIfExists);
+ } catch (Catalog.TableAlreadyExistException e) {
+ throw new TableAlreadyExistException(getName(), e.tablePath());
+ }
+ }
+
+ @Override
+ public void alterTable(
+ ObjectPath tablePath, CatalogBaseTable newTable, boolean
ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ try {
+ catalog.alterTable(
+ tablePath, convertTableToSchema(tablePath, newTable),
ignoreIfNotExists);
+ } catch (Catalog.TableNotExistException e) {
+ throw new TableNotExistException(getName(), e.tablePath());
+ }
+ }
+
+ private static ClassLoader classLoader() {
+ return FlinkCatalog.class.getClassLoader();
+ }
+
+ private UpdateSchema convertTableToSchema(ObjectPath tablePath,
CatalogBaseTable baseTable) {
+ if (!(baseTable instanceof CatalogTable)) {
+ throw new UnsupportedOperationException(
+ "Only support CatalogTable, but is: " +
baseTable.getClass());
+ }
+ CatalogTable table = (CatalogTable) baseTable;
+ Map<String, String> options = table.getOptions();
+ if (options.containsKey(CONNECTOR.key())) {
+ throw new CatalogException(
+ "Table Store Catalog only supports table store tables, not
Flink connector: "
+ + options.get(CONNECTOR.key()));
+ }
+
+ // remove table path
+ String specific = options.remove(PATH.key());
+ if (specific != null) {
+ if (!catalog.getTableLocation(tablePath).equals(new
Path(specific))) {
+ throw new IllegalArgumentException(
+ "Illegal table path in table options: " + specific);
+ }
+ table = table.copy(options);
+ }
+
+ return UpdateSchema.fromCatalogTable(table);
}
// --------------------- unsupported methods ----------------------------
+ @Override
+ public void createDatabase(String name, CatalogDatabase database, boolean
ignoreIfExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
Review Comment:
just wondered why
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]