atallahade opened a new pull request, #6040:
URL: https://github.com/apache/paimon/pull/6040

   <!-- Please specify the module before the PR name: [core] ... or [flink] ... 
-->
   
   ### Purpose
   
   <!-- Linking this pull request to the issue -->
   Linked issue: close #6039
   
   <!-- What is the purpose of the change -->
   As explained in the issue, Flink procedures don't parse delimited 
identifiers correctly when they contain a dot.
   This change correctly extract the database and table name when using the 
`Identifier.fromString` method.
   
   This is the list of all places I found to use the method:
   ```
   $ grep -r "Identifier\.fromString" --include="*.java" 
--exclude="*Test*.java" --exclude-dir="test" .
   
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java:
        Table table = catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java:
        Identifier identifier = Identifier.fromString(tableId);
   
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java:
        Identifier identifier = Identifier.fromString(tableId);
   
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java:
        Identifier identifier = Identifier.fromString(tableId);
   
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java:
        Table table = catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java:
        Identifier identifier = Identifier.fromString(targetTableId);
   
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java:
        Table table = catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java:
        Table table = catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java:
        Table table = catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java:
        Identifier sourceTableId = Identifier.fromString(sourceTablePath);
   
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java:
        Identifier targetTableId = Identifier.fromString(targetTablePath);
   
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java:
        Identifier sourceTableId = Identifier.fromString(sourceTablePath);
   
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java:
        Identifier targetTableId = Identifier.fromString(targetTablePath);
   
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java:
        ((FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId))).purgeFiles();
   
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java:
                (FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java:
                (FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java:
        Table table = catalog.getTable(Identifier.fromString(sourceTablePath));
   
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java:
                (FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java:
        Table table = catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/ogg/OggRecordParser.java:
        return Identifier.fromString(node.asText());
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java:
        Identifier identifier = Identifier.fromString(record.getValue().f1);
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializer.java:
                    Identifier.fromString(new String(serializedKey, 
StandardCharsets.UTF_8));
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java:
        Table table = catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java:
        Identifier identifier = Identifier.fromString(tableId);
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedure.java:
        ((FileStoreTable) catalog.getTable(Identifier.fromString(tableId)))
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterViewDialectProcedure.java:
        Identifier identifier = Identifier.fromString(view);
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java:
        Identifier identifier = Identifier.fromString(tableId);
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropFunctionProcedure.java:
        Identifier identifier = Identifier.fromString(function);
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FastForwardProcedure.java:
        Table table = catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterColumnDefaultValueProcedure.java:
        Identifier identifier = Identifier.fromString(table);
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java:
        Table table = catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java:
        Identifier identifier = Identifier.fromString(tableId);
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java:
        Identifier identifier = Identifier.fromString(tableId);
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedure.java:
        Identifier identifier = Identifier.fromString(tableId);
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java:
        Table table = catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java:
        Identifier identifier = Identifier.fromString(targetTableId);
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java:
        Table table = catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateFunctionProcedure.java:
        Identifier identifier = Identifier.fromString(function);
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java:
        Table table = catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java:
        Table table = catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java:
        Identifier sourceTableId = Identifier.fromString(sourceTablePath);
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java:
        Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RenameTagProcedure.java:
        Table table = catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java:
        Identifier sourceTableId = Identifier.fromString(sourceTable);
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java:
                Identifier.fromString(
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java:
        return catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java:
        ((FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId))).purgeFiles();
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java:
                (FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java:
        Table table = catalog.getTable(Identifier.fromString(sourceTablePath));
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java:
                (FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterFunctionProcedure.java:
        Identifier identifier = Identifier.fromString(function);
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java:
        Table table = catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/QueryServiceProcedure.java:
        Table table = catalog.getTable(Identifier.fromString(tableId));
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java:
        Identifier identifier = Identifier.fromString(tableId);
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java:
                        return ((FileStoreTable) 
catalog.getTable(Identifier.fromString(key)))
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java:
                        return 
pathOfTable(catalog.getTable(Identifier.fromString(key)));
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyManifestFileOperator.java:
        Identifier sourceIdentifier = 
Identifier.fromString(copyFileInfo.getSourceIdentifier());
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/SnapshotHintOperator.java:
                    (FileStoreTable) 
targetCatalog.getTable(Identifier.fromString(identifier));
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyMetaFilesFunction.java:
        Identifier sourceIdentifier = 
Identifier.fromString(sourceIdentifierStr);
   
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyMetaFilesFunction.java:
        Identifier targetIdentifier = 
Identifier.fromString(targetIdentifierStr);
   
./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java:
            identifier = 
org.apache.paimon.catalog.Identifier.fromString(tableId);
   
./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java:
                    org.apache.paimon.catalog.Identifier.fromString(
   
./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveUnexistingFilesProcedure.java:
                org.apache.paimon.catalog.Identifier.fromString(
   
./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java:
        Identifier sourceTableId = Identifier.fromString(sourceTable);
   
./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java:
                        ? Identifier.fromString(sourceTable + TMP_TBL_SUFFIX)
   
./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java:
                        : Identifier.fromString(targetTable);
   ```
   
   AFAICT, this will affect:
   * Flink procedures
   * The Flink CDC `OggRecordParser` parser
   * The Flink `MultiTablesReadOperator` source operator
   * The Flink `WrappedManifestCommittableSerializer`
   * Flink copy related methods
   * Spark procedures
   
   Both 
[Flink](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/common/#expanding-table-identifiers)
 and [Spark](https://spark.apache.org/docs/latest/sql-ref-identifier.html) use 
backticks for delimited identifiers.
   
   Relates to #5390.
   
   ### Tests
   
   <!-- List UT and IT cases to verify this change -->
   I have added unit tests for the `Identifier.fromString` method.
   
   I will also test this with some Flink procedures.
   
   ### API and Format
   
   <!-- Does this change affect API or storage format -->
   
   ### Documentation
   
   <!-- Does this change introduce a new feature -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to