atallahade opened a new pull request, #6040: URL: https://github.com/apache/paimon/pull/6040
<!-- Please specify the module before the PR name: [core] ... or [flink] ... --> ### Purpose <!-- Linking this pull request to the issue --> Linked issue: close #6039 <!-- What is the purpose of the change --> As explained in the issue, Flink procedures don't parse delimited identifiers correctly when they contain a dot. This change correctly extract the database and table name when using the `Identifier.fromString` method. This is the list of all places I found to use the method: ``` $ grep -r "Identifier\.fromString" --include="*.java" --exclude="*Test*.java" --exclude-dir="test" . ./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java: Identifier identifier = Identifier.fromString(tableId); ./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java: Identifier identifier = Identifier.fromString(tableId); ./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java: Identifier identifier = Identifier.fromString(tableId); ./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java: Identifier identifier = Identifier.fromString(targetTableId); ./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java: Identifier sourceTableId = Identifier.fromString(sourceTablePath); ./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java: Identifier targetTableId = Identifier.fromString(targetTablePath); ./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java: Identifier sourceTableId = Identifier.fromString(sourceTablePath); ./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java: Identifier targetTableId = Identifier.fromString(targetTablePath); ./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java: ((FileStoreTable) catalog.getTable(Identifier.fromString(tableId))).purgeFiles(); ./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java: (FileStoreTable) catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java: (FileStoreTable) catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java: Table table = catalog.getTable(Identifier.fromString(sourceTablePath)); ./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java: (FileStoreTable) catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/ogg/OggRecordParser.java: return Identifier.fromString(node.asText()); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java: Identifier identifier = Identifier.fromString(record.getValue().f1); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializer.java: Identifier.fromString(new String(serializedKey, StandardCharsets.UTF_8)); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java: Identifier identifier = Identifier.fromString(tableId); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedure.java: ((FileStoreTable) catalog.getTable(Identifier.fromString(tableId))) ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterViewDialectProcedure.java: Identifier identifier = Identifier.fromString(view); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java: Identifier identifier = Identifier.fromString(tableId); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropFunctionProcedure.java: Identifier identifier = Identifier.fromString(function); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FastForwardProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterColumnDefaultValueProcedure.java: Identifier identifier = Identifier.fromString(table); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java: Identifier identifier = Identifier.fromString(tableId); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java: Identifier identifier = Identifier.fromString(tableId); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedure.java: Identifier identifier = Identifier.fromString(tableId); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java: Identifier identifier = Identifier.fromString(targetTableId); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateFunctionProcedure.java: Identifier identifier = Identifier.fromString(function); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java: Identifier sourceTableId = Identifier.fromString(sourceTablePath); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java: Identifier targetTableId = Identifier.fromString(targetPaimonTablePath); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RenameTagProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java: Identifier sourceTableId = Identifier.fromString(sourceTable); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java: Identifier.fromString( ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java: return catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java: ((FileStoreTable) catalog.getTable(Identifier.fromString(tableId))).purgeFiles(); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java: (FileStoreTable) catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java: Table table = catalog.getTable(Identifier.fromString(sourceTablePath)); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java: (FileStoreTable) catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterFunctionProcedure.java: Identifier identifier = Identifier.fromString(function); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/QueryServiceProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId)); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java: Identifier identifier = Identifier.fromString(tableId); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java: return ((FileStoreTable) catalog.getTable(Identifier.fromString(key))) ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java: return pathOfTable(catalog.getTable(Identifier.fromString(key))); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyManifestFileOperator.java: Identifier sourceIdentifier = Identifier.fromString(copyFileInfo.getSourceIdentifier()); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/SnapshotHintOperator.java: (FileStoreTable) targetCatalog.getTable(Identifier.fromString(identifier)); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyMetaFilesFunction.java: Identifier sourceIdentifier = Identifier.fromString(sourceIdentifierStr); ./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyMetaFilesFunction.java: Identifier targetIdentifier = Identifier.fromString(targetIdentifierStr); ./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java: identifier = org.apache.paimon.catalog.Identifier.fromString(tableId); ./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java: org.apache.paimon.catalog.Identifier.fromString( ./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveUnexistingFilesProcedure.java: org.apache.paimon.catalog.Identifier.fromString( ./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java: Identifier sourceTableId = Identifier.fromString(sourceTable); ./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java: ? Identifier.fromString(sourceTable + TMP_TBL_SUFFIX) ./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java: : Identifier.fromString(targetTable); ``` AFAICT, this will affect: * Flink procedures * The Flink CDC `OggRecordParser` parser * The Flink `MultiTablesReadOperator` source operator * The Flink `WrappedManifestCommittableSerializer` * Flink copy related methods * Spark procedures Both [Flink](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/common/#expanding-table-identifiers) and [Spark](https://spark.apache.org/docs/latest/sql-ref-identifier.html) use backticks for delimited identifiers. Relates to #5390. ### Tests <!-- List UT and IT cases to verify this change --> I have added unit tests for the `Identifier.fromString` method. I will also test this with some Flink procedures. ### API and Format <!-- Does this change affect API or storage format --> ### Documentation <!-- Does this change introduce a new feature --> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org