This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 77670204c9 [core] Remove Catalog.getTableLocation interface (#4718)
77670204c9 is described below
commit 77670204c98501f9e86f04bc16d651ba52f9594a
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 16 19:41:24 2024 +0800
[core] Remove Catalog.getTableLocation interface (#4718)
---
.../org/apache/paimon/catalog/AbstractCatalog.java | 1 -
.../java/org/apache/paimon/catalog/Catalog.java | 9 -----
.../org/apache/paimon/catalog/DelegateCatalog.java | 6 ----
.../java/org/apache/paimon/rest/RESTCatalog.java | 6 ----
.../java/org/apache/paimon/flink/FlinkCatalog.java | 15 --------
.../paimon/flink/clone/CopyFileOperator.java | 41 ++++++++++++++++++----
.../org/apache/paimon/flink/FlinkCatalogTest.java | 8 ++---
.../sink/partition/PartitionMarkDoneTest.java | 2 +-
8 files changed, 38 insertions(+), 50 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index b56fec279a..a1b41e3b8a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -496,7 +496,6 @@ public abstract class AbstractCatalog implements Catalog {
return Optional.empty();
}
- @Override
public Path getTableLocation(Identifier identifier) {
return new Path(newDatabasePath(identifier.getDatabaseName()),
identifier.getTableName());
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index d919c59782..c3808caa13 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -20,7 +20,6 @@ package org.apache.paimon.catalog;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
@@ -136,14 +135,6 @@ public interface Catalog extends AutoCloseable {
*/
Table getTable(Identifier identifier) throws TableNotExistException;
- /**
- * Get the table location in this catalog. If the table exists, return the
location of the
- * table; If the table does not exist, construct the location for table.
- *
- * @return the table location
- */
- Path getTableLocation(Identifier identifier);
-
/**
* Get names of all tables under this database. An empty list is returned
if none exists.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index ec14d53a2b..2298626b0e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -19,7 +19,6 @@
package org.apache.paimon.catalog;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
@@ -147,11 +146,6 @@ public class DelegateCatalog implements Catalog {
wrapped.renameView(fromView, toView, ignoreIfNotExists);
}
- @Override
- public Path getTableLocation(Identifier identifier) {
- return wrapped.getTableLocation(identifier);
- }
-
@Override
public void createPartition(Identifier identifier, Map<String, String>
partitions)
throws TableNotExistException {
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 03b257efbf..86b87e25e8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -22,7 +22,6 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
@@ -177,11 +176,6 @@ public class RESTCatalog implements Catalog {
throw new UnsupportedOperationException();
}
- @Override
- public Path getTableLocation(Identifier identifier) {
- throw new UnsupportedOperationException();
- }
-
@Override
public List<String> listTables(String databaseName) throws
DatabaseNotExistException {
return new ArrayList<String>();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 3a7f9790cc..dd95c48af8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -25,7 +25,6 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
import org.apache.paimon.flink.utils.FlinkDescriptorProperties;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.options.Options;
@@ -525,20 +524,6 @@ public class FlinkCatalog extends AbstractCatalog {
registerLogSystem(catalog, identifier, options, classLoader);
}
- // remove table path
- String path = options.remove(PATH.key());
- if (path != null) {
- Path expectedPath = catalog.getTableLocation(identifier);
- if (!new Path(path).equals(expectedPath)) {
- throw new CatalogException(
- String.format(
- "You specified the Path when creating the
table, "
- + "but the Path '%s' is different from
where it should be '%s'. "
- + "Please remove the Path.",
- path, expectedPath));
- }
- }
-
if (catalogTable instanceof CatalogTable) {
return fromCatalogTable(((CatalogTable)
catalogTable).copy(options));
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
index d355424299..8bcaa2a207 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
@@ -18,12 +18,14 @@
package org.apache.paimon.flink.clone;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
import org.apache.paimon.utils.IOUtils;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -32,6 +34,7 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
import java.util.Map;
/** A Operator to copy files. */
@@ -43,8 +46,11 @@ public class CopyFileOperator extends
AbstractStreamOperator<CloneFileInfo>
private final Map<String, String> sourceCatalogConfig;
private final Map<String, String> targetCatalogConfig;
- private Catalog sourceCatalog;
- private Catalog targetCatalog;
+ private transient Catalog sourceCatalog;
+ private transient Catalog targetCatalog;
+
+ private transient Map<String, Path> srcLocations;
+ private transient Map<String, Path> targetLocations;
public CopyFileOperator(
Map<String, String> sourceCatalogConfig, Map<String, String>
targetCatalogConfig) {
@@ -58,6 +64,8 @@ public class CopyFileOperator extends
AbstractStreamOperator<CloneFileInfo>
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig));
targetCatalog =
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig));
+ srcLocations = new HashMap<>();
+ targetLocations = new HashMap<>();
}
@Override
@@ -66,12 +74,29 @@ public class CopyFileOperator extends
AbstractStreamOperator<CloneFileInfo>
FileIO sourceTableFileIO = sourceCatalog.fileIO();
FileIO targetTableFileIO = targetCatalog.fileIO();
+
Path sourceTableRootPath =
- sourceCatalog.getTableLocation(
-
Identifier.fromString(cloneFileInfo.getSourceIdentifier()));
+ srcLocations.computeIfAbsent(
+ cloneFileInfo.getSourceIdentifier(),
+ key -> {
+ try {
+ return pathOfTable(
+
sourceCatalog.getTable(Identifier.fromString(key)));
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ });
Path targetTableRootPath =
- targetCatalog.getTableLocation(
-
Identifier.fromString(cloneFileInfo.getTargetIdentifier()));
+ targetLocations.computeIfAbsent(
+ cloneFileInfo.getTargetIdentifier(),
+ key -> {
+ try {
+ return pathOfTable(
+
targetCatalog.getTable(Identifier.fromString(key)));
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ });
String filePathExcludeTableRoot =
cloneFileInfo.getFilePathExcludeTableRoot();
Path sourcePath = new Path(sourceTableRootPath +
filePathExcludeTableRoot);
@@ -110,6 +135,10 @@ public class CopyFileOperator extends
AbstractStreamOperator<CloneFileInfo>
output.collect(streamRecord);
}
+ private Path pathOfTable(Table table) {
+ return new Path(table.options().get(CoreOptions.PATH.key()));
+ }
+
@Override
public void close() throws Exception {
if (sourceCatalog != null) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index e4286eb181..734a47dead 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -102,6 +102,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
/** Test for {@link FlinkCatalog}. */
public class FlinkCatalogTest {
+
private static final String TESTING_LOG_STORE = "testing";
private final ObjectPath path1 = new ObjectPath("db1", "t1");
@@ -348,12 +349,7 @@ public class FlinkCatalogTest {
CatalogTable table1 = createTable(options);
assertThatThrownBy(() -> catalog.createTable(this.path1, table1,
false))
.hasMessageContaining(
- "You specified the Path when creating the table, "
- + "but the Path '/unknown/path' is different
from where it should be");
-
- options.put(PATH.key(), warehouse + "/db1.db/t1");
- CatalogTable table2 = createTable(options);
- catalog.createTable(this.path1, table2, false);
+ "The current catalog FileSystemCatalog does not
support specifying the table path when creating a table.");
}
@ParameterizedTest
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
index f0f4596c61..9e5fe7ff9f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
@@ -83,7 +83,7 @@ class PartitionMarkDoneTest extends TableTestBase {
.build();
catalog.createTable(identifier, schema, true);
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
- Path location = catalog.getTableLocation(identifier);
+ Path location = table.location();
Path successFile = new Path(location, "a=0/_SUCCESS");
PartitionMarkDone markDone =
PartitionMarkDone.create(false, false, new
MockOperatorStateStore(), table).get();