This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-0.8 by this push:
new 766808247e [#5194] feat(flink): Support basic table DDL Operation for
paimon-catalog (#6255)
766808247e is described below
commit 766808247e3767ef92a3594af5aef00057c796a9
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jan 15 19:05:01 2025 +0800
[#5194] feat(flink): Support basic table DDL Operation for paimon-catalog
(#6255)
### What changes were proposed in this pull request?
Support basic table DDL Operation for paimon-catalog
### Why are the changes needed?
Fix: #5194
### Does this PR introduce _any_ user-facing change?
None.
### How was this patch tested?
org.apache.gravitino.flink.connector.integration.test.paimon.FlinkPaimonCatalogIT
Co-authored-by: yangyang zhong <[email protected]>
---
.../flink/connector/catalog/BaseCatalog.java | 4 ++--
.../connector/paimon/GravitinoPaimonCatalog.java | 24 +++++++++++++++++++++
.../connector/integration/test/FlinkEnvIT.java | 8 ++-----
.../integration/test/hive/FlinkHiveCatalogIT.java | 25 ++++++++++++++++++++++
.../test/paimon/FlinkPaimonCatalogIT.java | 10 ---------
5 files changed, 53 insertions(+), 18 deletions(-)
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
index 1496742177..fd8e118ee4 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
@@ -656,11 +656,11 @@ public abstract class BaseCatalog extends AbstractCatalog
{
return schemaChanges.toArray(new SchemaChange[0]);
}
- private Catalog catalog() {
+ protected Catalog catalog() {
return GravitinoCatalogManager.get().getGravitinoCatalogInfo(getName());
}
- private String catalogName() {
+ protected String catalogName() {
return getName();
}
}
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
index 017ac6e708..c22e00fa12 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
@@ -19,10 +19,17 @@
package org.apache.gravitino.flink.connector.paimon;
+import java.util.Optional;
import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.factories.Factory;
+import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
+import org.apache.paimon.flink.FlinkTableFactory;
/**
* The GravitinoPaimonCatalog class is an implementation of the BaseCatalog
class that is used to
@@ -45,4 +52,21 @@ public class GravitinoPaimonCatalog extends BaseCatalog {
protected AbstractCatalog realCatalog() {
return paimonCatalog;
}
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ boolean dropped =
+ catalog()
+ .asTableCatalog()
+ .purgeTable(NameIdentifier.of(tablePath.getDatabaseName(),
tablePath.getObjectName()));
+ if (!dropped && !ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName(), tablePath);
+ }
+ }
+
+ @Override
+ public Optional<Factory> getFactory() {
+ return Optional.of(new FlinkTableFactory());
+ }
}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
index 5ae8847c6c..f56b5297e1 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
@@ -19,7 +19,6 @@
package org.apache.gravitino.flink.connector.integration.test;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import java.io.IOException;
@@ -159,17 +158,14 @@ public abstract class FlinkEnvIT extends BaseIT {
return tableEnv.executeSql(String.format(sql, args));
}
- protected static void doWithSchema(
+ protected void doWithSchema(
Catalog catalog, String schemaName, Consumer<Catalog> action, boolean
dropSchema) {
Preconditions.checkNotNull(catalog);
Preconditions.checkNotNull(schemaName);
try {
tableEnv.useCatalog(catalog.name());
if (!catalog.asSchemas().schemaExists(schemaName)) {
- catalog
- .asSchemas()
- .createSchema(
- schemaName, null, ImmutableMap.of("location", warehouse + "/"
+ schemaName));
+ catalog.asSchemas().createSchema(schemaName, null, null);
}
tableEnv.useDatabase(schemaName);
action.accept(catalog);
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
index 333aa83f0b..bb7b25f6b2 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
@@ -586,4 +587,28 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
protected org.apache.gravitino.Catalog currentCatalog() {
return hiveCatalog;
}
+
+ protected void doWithSchema(
+ org.apache.gravitino.Catalog catalog,
+ String schemaName,
+ Consumer<org.apache.gravitino.Catalog> action,
+ boolean dropSchema) {
+ Preconditions.checkNotNull(catalog);
+ Preconditions.checkNotNull(schemaName);
+ try {
+ tableEnv.useCatalog(catalog.name());
+ if (!catalog.asSchemas().schemaExists(schemaName)) {
+ catalog
+ .asSchemas()
+ .createSchema(
+ schemaName, null, ImmutableMap.of("location", warehouse + "/"
+ schemaName));
+ }
+ tableEnv.useDatabase(schemaName);
+ action.accept(catalog);
+ } finally {
+ if (dropSchema) {
+ catalog.asSchemas().dropSchema(schemaName, true);
+ }
+ }
+ }
}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
index 10fab3567a..57a17c2a11 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
@@ -42,16 +42,6 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT {
private static org.apache.gravitino.Catalog catalog;
- @Override
- protected boolean supportColumnOperation() {
- return false;
- }
-
- @Override
- protected boolean supportTableOperation() {
- return false;
- }
-
@Override
protected boolean supportSchemaOperationWithCommentAndOptions() {
return false;