This is an automated email from the ASF dual-hosted git repository.

Zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e56437ec3a [spark] Support catalog-qualified CREATE TABLE LIKE (#7924)
e56437ec3a is described below

commit e56437ec3a75b354058289212aa1a20585e78dd6
Author: Kerwin Zhang <[email protected]>
AuthorDate: Wed May 27 20:30:41 2026 +0800

    [spark] Support catalog-qualified CREATE TABLE LIKE (#7924)
---
 .../AbstractPaimonSparkSqlExtensionsParser.scala   | 174 +++++++++++++++++++-
 .../PaimonSqlExtensions.g4                         |   7 +-
 .../AbstractPaimonSparkSqlExtensionsParser.scala   | 173 +++++++++++++++++++-
 .../extensions/PaimonSqlExtensionsAstBuilder.scala |  53 ++++++
 .../sql/CatalogQualifiedCreateTableLikeTest.scala  | 180 +++++++++++++++++++++
 5 files changed, 583 insertions(+), 4 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
index 67f72d953a..3529944f37 100644
--- 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
+++ 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
@@ -52,12 +52,46 @@ import scala.collection.JavaConverters._
  * @param delegate
  *   The extension parser.
  */
+// Keep this class in the Spark 4.0 module so it is compiled against Spark 
4.0's ParserInterface.
 abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: 
ParserInterface)
   extends org.apache.spark.sql.catalyst.parser.ParserInterface
   with Logging {
 
   private lazy val substitutor = new VariableSubstitution()
   private lazy val astBuilder = new PaimonSqlExtensionsAstBuilder(delegate)
+  private val nonReservedIdentifierTokenTypes = Set(
+    PaimonSqlExtensionsParser.ALTER,
+    PaimonSqlExtensionsParser.AS,
+    PaimonSqlExtensionsParser.CALL,
+    PaimonSqlExtensionsParser.CREATE,
+    PaimonSqlExtensionsParser.DAYS,
+    PaimonSqlExtensionsParser.DELETE,
+    PaimonSqlExtensionsParser.EXISTS,
+    PaimonSqlExtensionsParser.HOURS,
+    PaimonSqlExtensionsParser.IF,
+    PaimonSqlExtensionsParser.LIKE,
+    PaimonSqlExtensionsParser.NOT,
+    PaimonSqlExtensionsParser.OF,
+    PaimonSqlExtensionsParser.OR,
+    PaimonSqlExtensionsParser.TABLE,
+    PaimonSqlExtensionsParser.REPLACE,
+    PaimonSqlExtensionsParser.RETAIN,
+    PaimonSqlExtensionsParser.VERSION,
+    PaimonSqlExtensionsParser.TAG,
+    PaimonSqlExtensionsParser.TRUE,
+    PaimonSqlExtensionsParser.FALSE,
+    PaimonSqlExtensionsParser.MAP,
+    PaimonSqlExtensionsParser.COPY,
+    PaimonSqlExtensionsParser.INTO,
+    PaimonSqlExtensionsParser.FROM,
+    PaimonSqlExtensionsParser.FILE_FORMAT,
+    PaimonSqlExtensionsParser.PATTERN,
+    PaimonSqlExtensionsParser.FORCE,
+    PaimonSqlExtensionsParser.ON_ERROR,
+    PaimonSqlExtensionsParser.ABORT_STATEMENT,
+    PaimonSqlExtensionsParser.OVERWRITE,
+    PaimonSqlExtensionsParser.CSV
+  )
 
   /** Parses a string to a LogicalPlan. */
   override def parsePlan(sqlText: String): LogicalPlan = {
@@ -66,7 +100,14 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val 
delegate: ParserInterf
       parse(sqlTextAfterSubstitution)(parser => 
astBuilder.visit(parser.singleStatement()))
         .asInstanceOf[LogicalPlan]
     } else {
-      var plan = delegate.parsePlan(sqlText)
+      var plan =
+        try {
+          delegate.parsePlan(sqlText)
+        } catch {
+          case _: ParseException if 
maybeCatalogCreateTableLike(sqlTextAfterSubstitution) =>
+            parse(sqlTextAfterSubstitution)(parser => 
astBuilder.visit(parser.singleStatement()))
+              .asInstanceOf[LogicalPlan]
+        }
       val sparkSession = PaimonSparkSession.active
       parserRules(sparkSession).foreach(
         rule => {
@@ -144,6 +185,137 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val 
delegate: ParserInterf
     normalized.startsWith("copy into")
   }
 
+  /**
+   * Cheap token-level check for `CREATE TABLE [IF NOT EXISTS] x.y[.z] LIKE 
...` shape. Used as a
+   * gate for the Paimon parser fallback when the delegate parser rejects a 
catalog-qualified CREATE
+   * TABLE LIKE statement.
+   */
+  private def maybeCatalogCreateTableLike(sqlText: String): Boolean = {
+    if (org.apache.spark.SPARK_VERSION < "3.4") {
+      return false
+    }
+    if (!startsWithCreateTable(sqlText)) {
+      return false
+    }
+
+    tokenStream(sqlText) match {
+      case Some(tokens) => maybeCreateTableLike(tokens)
+      case None => false
+    }
+  }
+
+  private def tokenStream(sqlText: String): Option[CommonTokenStream] = {
+    try {
+      val lexer = new PaimonSqlExtensionsLexer(
+        new UpperCaseCharStream(CharStreams.fromString(sqlText)))
+      lexer.removeErrorListeners()
+      lexer.addErrorListener(PaimonParseErrorListener)
+
+      val tokens = new CommonTokenStream(lexer)
+      tokens.fill()
+      Some(tokens)
+    } catch {
+      case _: PaimonParseException => None
+    }
+  }
+
+  private def maybeCreateTableLike(tokenStream: CommonTokenStream): Boolean = {
+    val tokens = tokenStream.getTokens.asScala
+      .filter(token => token.getChannel == Token.DEFAULT_CHANNEL)
+      .filterNot(token => token.getType == Token.EOF)
+
+    if (tokens.length < 5) return false
+    if (tokens(0).getType != PaimonSqlExtensionsParser.CREATE) return false
+    if (tokens(1).getType != PaimonSqlExtensionsParser.TABLE) return false
+
+    var idx = 2
+    if (
+      idx + 2 < tokens.length &&
+      tokens(idx).getType == PaimonSqlExtensionsParser.IF &&
+      tokens(idx + 1).getType == PaimonSqlExtensionsParser.NOT &&
+      tokens(idx + 2).getType == PaimonSqlExtensionsParser.EXISTS
+    ) {
+      idx += 3
+    }
+
+    if (idx >= tokens.length || !isIdentifierToken(tokens(idx))) return false
+    idx += 1
+
+    while (
+      idx + 1 < tokens.length &&
+      tokens(idx).getText == "." &&
+      isIdentifierToken(tokens(idx + 1))
+    ) {
+      idx += 2
+    }
+
+    idx < tokens.length && tokens(idx).getType == 
PaimonSqlExtensionsParser.LIKE
+  }
+
+  private def isIdentifierToken(token: Token): Boolean = {
+    token.getType == PaimonSqlExtensionsParser.IDENTIFIER ||
+    token.getType == PaimonSqlExtensionsParser.BACKQUOTED_IDENTIFIER ||
+    nonReservedIdentifierTokenTypes.contains(token.getType)
+  }
+
+  private def startsWithCreateTable(sqlText: String): Boolean = {
+    val createIndex = skipWhitespaceAndComments(sqlText, 0)
+    if (!matchesWord(sqlText, createIndex, "create")) {
+      return false
+    }
+
+    val tableIndex = skipWhitespaceAndComments(sqlText, createIndex + 
"create".length)
+    matchesWord(sqlText, tableIndex, "table")
+  }
+
+  private def skipWhitespaceAndComments(sqlText: String, start: Int): Int = {
+    var index = start
+    var continue = true
+
+    while (continue) {
+      while (index < sqlText.length && sqlText.charAt(index).isWhitespace) {
+        index += 1
+      }
+
+      if (
+        index + 1 < sqlText.length &&
+        sqlText.charAt(index) == '-' &&
+        sqlText.charAt(index + 1) == '-'
+      ) {
+        index += 2
+        while (
+          index < sqlText.length &&
+          sqlText.charAt(index) != '\n' &&
+          sqlText.charAt(index) != '\r'
+        ) {
+          index += 1
+        }
+      } else if (
+        index + 1 < sqlText.length &&
+        sqlText.charAt(index) == '/' &&
+        sqlText.charAt(index + 1) == '*'
+      ) {
+        val close = sqlText.indexOf("*/", index + 2)
+        index = if (close >= 0) close + 2 else sqlText.length
+      } else {
+        continue = false
+      }
+    }
+
+    index
+  }
+
+  private def matchesWord(sqlText: String, index: Int, word: String): Boolean 
= {
+    index + word.length <= sqlText.length &&
+    sqlText.regionMatches(true, index, word, 0, word.length) &&
+    (index + word.length == sqlText.length ||
+      !isIdentifierPart(sqlText.charAt(index + word.length)))
+  }
+
+  private def isIdentifierPart(char: Char): Boolean = {
+    char.isLetterOrDigit || char == '_'
+  }
+
   protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser 
=> T): T = {
     val lexer = new PaimonSqlExtensionsLexer(
       new UpperCaseCharStream(CharStreams.fromString(command)))
diff --git 
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
 
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
index 4255e530c1..12a5bc8c51 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
+++ 
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
@@ -84,6 +84,8 @@ statement
       FROM multipartIdentifier
       fileFormatClause
       overwriteClause?                                                         
             #copyIntoLocation
+    | CREATE TABLE (IF NOT EXISTS)? target=multipartIdentifier
+        LIKE source=multipartIdentifier ( . )*?                                
             #createTableLike
   ;
 
 callArgument
@@ -197,8 +199,8 @@ quotedIdentifier
     ;
 
 nonReserved
-    : ALTER | AS | CALL | CREATE | DAYS | DELETE | EXISTS | HOURS | IF | NOT | 
OF | OR | TABLE
-    | REPLACE | RETAIN | VERSION | TAG
+    : ALTER | AS | CALL | CREATE | DAYS | DELETE | EXISTS | HOURS | IF | LIKE
+    | NOT | OF | OR | TABLE | REPLACE | RETAIN | VERSION | TAG
     | TRUE | FALSE
     | MAP
     | COPY | INTO | FROM | FILE_FORMAT | PATTERN | FORCE | ON_ERROR | 
ABORT_STATEMENT | OVERWRITE
@@ -214,6 +216,7 @@ DELETE: 'DELETE';
 EXISTS: 'EXISTS';
 HOURS: 'HOURS';
 IF : 'IF';
+LIKE: 'LIKE';
 MINUTES: 'MINUTES';
 NOT: 'NOT';
 OF: 'OF';
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
index 67f72d953a..1e0c13a573 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
@@ -58,6 +58,39 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val 
delegate: ParserInterf
 
   private lazy val substitutor = new VariableSubstitution()
   private lazy val astBuilder = new PaimonSqlExtensionsAstBuilder(delegate)
+  private val nonReservedIdentifierTokenTypes = Set(
+    PaimonSqlExtensionsParser.ALTER,
+    PaimonSqlExtensionsParser.AS,
+    PaimonSqlExtensionsParser.CALL,
+    PaimonSqlExtensionsParser.CREATE,
+    PaimonSqlExtensionsParser.DAYS,
+    PaimonSqlExtensionsParser.DELETE,
+    PaimonSqlExtensionsParser.EXISTS,
+    PaimonSqlExtensionsParser.HOURS,
+    PaimonSqlExtensionsParser.IF,
+    PaimonSqlExtensionsParser.LIKE,
+    PaimonSqlExtensionsParser.NOT,
+    PaimonSqlExtensionsParser.OF,
+    PaimonSqlExtensionsParser.OR,
+    PaimonSqlExtensionsParser.TABLE,
+    PaimonSqlExtensionsParser.REPLACE,
+    PaimonSqlExtensionsParser.RETAIN,
+    PaimonSqlExtensionsParser.VERSION,
+    PaimonSqlExtensionsParser.TAG,
+    PaimonSqlExtensionsParser.TRUE,
+    PaimonSqlExtensionsParser.FALSE,
+    PaimonSqlExtensionsParser.MAP,
+    PaimonSqlExtensionsParser.COPY,
+    PaimonSqlExtensionsParser.INTO,
+    PaimonSqlExtensionsParser.FROM,
+    PaimonSqlExtensionsParser.FILE_FORMAT,
+    PaimonSqlExtensionsParser.PATTERN,
+    PaimonSqlExtensionsParser.FORCE,
+    PaimonSqlExtensionsParser.ON_ERROR,
+    PaimonSqlExtensionsParser.ABORT_STATEMENT,
+    PaimonSqlExtensionsParser.OVERWRITE,
+    PaimonSqlExtensionsParser.CSV
+  )
 
   /** Parses a string to a LogicalPlan. */
   override def parsePlan(sqlText: String): LogicalPlan = {
@@ -66,7 +99,14 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val 
delegate: ParserInterf
       parse(sqlTextAfterSubstitution)(parser => 
astBuilder.visit(parser.singleStatement()))
         .asInstanceOf[LogicalPlan]
     } else {
-      var plan = delegate.parsePlan(sqlText)
+      var plan =
+        try {
+          delegate.parsePlan(sqlText)
+        } catch {
+          case _: ParseException if 
maybeCatalogCreateTableLike(sqlTextAfterSubstitution) =>
+            parse(sqlTextAfterSubstitution)(parser => 
astBuilder.visit(parser.singleStatement()))
+              .asInstanceOf[LogicalPlan]
+        }
       val sparkSession = PaimonSparkSession.active
       parserRules(sparkSession).foreach(
         rule => {
@@ -144,6 +184,137 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val 
delegate: ParserInterf
     normalized.startsWith("copy into")
   }
 
+  /**
+   * Cheap token-level check for `CREATE TABLE [IF NOT EXISTS] x.y[.z] LIKE 
...` shape. Used as a
+   * gate for the Paimon parser fallback when the delegate parser rejects a 
catalog-qualified CREATE
+   * TABLE LIKE statement.
+   */
+  private def maybeCatalogCreateTableLike(sqlText: String): Boolean = {
+    if (org.apache.spark.SPARK_VERSION < "3.4") {
+      return false
+    }
+    if (!startsWithCreateTable(sqlText)) {
+      return false
+    }
+
+    tokenStream(sqlText) match {
+      case Some(tokens) => maybeCreateTableLike(tokens)
+      case None => false
+    }
+  }
+
+  private def tokenStream(sqlText: String): Option[CommonTokenStream] = {
+    try {
+      val lexer = new PaimonSqlExtensionsLexer(
+        new UpperCaseCharStream(CharStreams.fromString(sqlText)))
+      lexer.removeErrorListeners()
+      lexer.addErrorListener(PaimonParseErrorListener)
+
+      val tokens = new CommonTokenStream(lexer)
+      tokens.fill()
+      Some(tokens)
+    } catch {
+      case _: PaimonParseException => None
+    }
+  }
+
+  private def maybeCreateTableLike(tokenStream: CommonTokenStream): Boolean = {
+    val tokens = tokenStream.getTokens.asScala
+      .filter(token => token.getChannel == Token.DEFAULT_CHANNEL)
+      .filterNot(token => token.getType == Token.EOF)
+
+    if (tokens.length < 5) return false
+    if (tokens(0).getType != PaimonSqlExtensionsParser.CREATE) return false
+    if (tokens(1).getType != PaimonSqlExtensionsParser.TABLE) return false
+
+    var idx = 2
+    if (
+      idx + 2 < tokens.length &&
+      tokens(idx).getType == PaimonSqlExtensionsParser.IF &&
+      tokens(idx + 1).getType == PaimonSqlExtensionsParser.NOT &&
+      tokens(idx + 2).getType == PaimonSqlExtensionsParser.EXISTS
+    ) {
+      idx += 3
+    }
+
+    if (idx >= tokens.length || !isIdentifierToken(tokens(idx))) return false
+    idx += 1
+
+    while (
+      idx + 1 < tokens.length &&
+      tokens(idx).getText == "." &&
+      isIdentifierToken(tokens(idx + 1))
+    ) {
+      idx += 2
+    }
+
+    idx < tokens.length && tokens(idx).getType == 
PaimonSqlExtensionsParser.LIKE
+  }
+
+  private def isIdentifierToken(token: Token): Boolean = {
+    token.getType == PaimonSqlExtensionsParser.IDENTIFIER ||
+    token.getType == PaimonSqlExtensionsParser.BACKQUOTED_IDENTIFIER ||
+    nonReservedIdentifierTokenTypes.contains(token.getType)
+  }
+
+  private def startsWithCreateTable(sqlText: String): Boolean = {
+    val createIndex = skipWhitespaceAndComments(sqlText, 0)
+    if (!matchesWord(sqlText, createIndex, "create")) {
+      return false
+    }
+
+    val tableIndex = skipWhitespaceAndComments(sqlText, createIndex + 
"create".length)
+    matchesWord(sqlText, tableIndex, "table")
+  }
+
+  private def skipWhitespaceAndComments(sqlText: String, start: Int): Int = {
+    var index = start
+    var continue = true
+
+    while (continue) {
+      while (index < sqlText.length && sqlText.charAt(index).isWhitespace) {
+        index += 1
+      }
+
+      if (
+        index + 1 < sqlText.length &&
+        sqlText.charAt(index) == '-' &&
+        sqlText.charAt(index + 1) == '-'
+      ) {
+        index += 2
+        while (
+          index < sqlText.length &&
+          sqlText.charAt(index) != '\n' &&
+          sqlText.charAt(index) != '\r'
+        ) {
+          index += 1
+        }
+      } else if (
+        index + 1 < sqlText.length &&
+        sqlText.charAt(index) == '/' &&
+        sqlText.charAt(index + 1) == '*'
+      ) {
+        val close = sqlText.indexOf("*/", index + 2)
+        index = if (close >= 0) close + 2 else sqlText.length
+      } else {
+        continue = false
+      }
+    }
+
+    index
+  }
+
+  private def matchesWord(sqlText: String, index: Int, word: String): Boolean 
= {
+    index + word.length <= sqlText.length &&
+    sqlText.regionMatches(true, index, word, 0, word.length) &&
+    (index + word.length == sqlText.length ||
+      !isIdentifierPart(sqlText.charAt(index + word.length)))
+  }
+
+  private def isIdentifierPart(char: Char): Boolean = {
+    char.isLetterOrDigit || char == '_'
+  }
+
   protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser 
=> T): T = {
     val lexer = new PaimonSqlExtensionsLexer(
       new UpperCaseCharStream(CharStreams.fromString(command)))
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
index dd3f3c4d15..da716ced11 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
@@ -26,11 +26,13 @@ import org.antlr.v4.runtime._
 import org.antlr.v4.runtime.misc.Interval
 import org.antlr.v4.runtime.tree.{ParseTree, TerminalNode}
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import 
org.apache.spark.sql.catalyst.parser.extensions.PaimonParserUtils.withOrigin
 import 
org.apache.spark.sql.catalyst.parser.extensions.PaimonSqlExtensionsParser._
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.command.{CreateTableLikeCommand => 
SparkCreateTableLikeCommand}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -99,6 +101,13 @@ class PaimonSqlExtensionsAstBuilder(delegate: 
ParserInterface)
     ShowTagsCommand(typedVisit[Seq[String]](ctx.multipartIdentifier))
   }
 
+  /** Create a CREATE TABLE LIKE logical command. */
+  override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan 
= withOrigin(ctx) {
+    sparkCreateTableLikeCommand(ctx).copy(
+      targetTable = toTableIdentifier(typedVisit[Seq[String]](ctx.target)),
+      sourceTable = toTableIdentifier(typedVisit[Seq[String]](ctx.source)))
+  }
+
   /** Create a CREATE OR REPLACE TAG logical command. */
   override def visitCreateOrReplaceTag(ctx: CreateOrReplaceTagContext): 
CreateOrReplaceTagCommand =
     withOrigin(ctx) {
@@ -253,6 +262,50 @@ class PaimonSqlExtensionsAstBuilder(delegate: 
ParserInterface)
 
   private def toSeq[T](list: java.util.List[T]) = toBuffer(list)
 
+  private def toTableIdentifier(identifier: Seq[String]): TableIdentifier = {
+    identifier match {
+      case Seq(table) =>
+        TableIdentifier(table)
+      case Seq(database, table) =>
+        TableIdentifier(table, Some(database))
+      case parts =>
+        TableIdentifier(
+          parts.last,
+          Some(parts.slice(1, parts.length - 1).mkString(".")),
+          Some(parts.head))
+    }
+  }
+
+  private def sparkCreateTableLikeCommand(
+      ctx: CreateTableLikeContext): SparkCreateTableLikeCommand = {
+    delegate.parsePlan(createSparkCreateTableLikeSql(ctx)) match {
+      case command: SparkCreateTableLikeCommand => command
+      case plan =>
+        throw new UnsupportedOperationException(
+          s"Expected Spark CREATE TABLE LIKE command, but got 
${plan.nodeName}.")
+    }
+  }
+
+  private def createSparkCreateTableLikeSql(ctx: CreateTableLikeContext): 
String = {
+    val stream = ctx.getStart.getInputStream
+    val baseStart = ctx.getStart.getStartIndex
+    val baseStop = ctx.getStop.getStopIndex
+    val targetStart = ctx.target.getStart.getStartIndex
+    val targetStop = ctx.target.getStop.getStopIndex
+    val sourceStart = ctx.source.getStart.getStartIndex
+    val sourceStop = ctx.source.getStop.getStopIndex
+
+    val prefix = stream.getText(Interval.of(baseStart, targetStart - 1))
+    val middle = stream.getText(Interval.of(targetStop + 1, sourceStart - 1))
+    val suffix = if (sourceStop < baseStop) {
+      stream.getText(Interval.of(sourceStop + 1, baseStop))
+    } else {
+      ""
+    }
+
+    prefix + "__paimon_create_like_target" + middle + 
"__paimon_create_like_source" + suffix
+  }
+
   private def reconstructSqlString(ctx: ParserRuleContext): String = {
     toBuffer(ctx.children)
       .map {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CatalogQualifiedCreateTableLikeTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CatalogQualifiedCreateTableLikeTest.scala
new file mode 100644
index 0000000000..03e109919b
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CatalogQualifiedCreateTableLikeTest.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.spark.commands.PaimonCreateTableLikeCommand
+
+import org.junit.jupiter.api.Assertions
+
+import scala.collection.JavaConverters._
+
+class CatalogQualifiedCreateTableLikeTest extends PaimonSparkTestBase {
+
+  test("Create table like with catalog-qualified identifiers") {
+    assume(gteqSpark3_4)
+    withTable("source_tbl", "target_tbl", "target_from_qualified_source", 
"qualified_target") {
+      createSourceTable()
+
+      sql(s"CREATE TABLE paimon.$dbName0.target_tbl LIKE 
paimon.$dbName0.source_tbl")
+      assertCreatedLike("target_tbl")
+
+      sql(s"CREATE TABLE target_from_qualified_source LIKE 
paimon.$dbName0.source_tbl")
+      assertCreatedLike("target_from_qualified_source")
+
+      sql(s"CREATE TABLE paimon.$dbName0.qualified_target LIKE source_tbl")
+      assertCreatedLike("qualified_target")
+    }
+  }
+
+  test("Create table like if not exists with catalog-qualified identifiers") {
+    assume(gteqSpark3_4)
+    withTable("source_tbl", "target_tbl") {
+      createSourceTable()
+      sql("""
+            |CREATE TABLE target_tbl (
+            |  id BIGINT,
+            |  pt STRING
+            |) COMMENT 'target comment'
+            |PARTITIONED BY (pt)
+            |TBLPROPERTIES (
+            |  'primary-key' = 'id,pt',
+            |  'bucket' = '3'
+            |)
+            |""".stripMargin)
+
+      val targetSchema = spark.table("target_tbl").schema
+      val targetLocation = loadTable("target_tbl").location().toString
+
+      sql(s"""
+             |CREATE TABLE IF NOT EXISTS paimon.$dbName0.target_tbl
+             |LIKE paimon.$dbName0.source_tbl
+             |""".stripMargin)
+
+      val target = loadTable("target_tbl")
+      Assertions.assertEquals(targetSchema, spark.table("target_tbl").schema)
+      
Assertions.assertFalse(spark.table("target_tbl").schema.fieldNames.contains("name"))
+      Assertions.assertEquals("target comment", target.comment().get())
+      Assertions.assertEquals("3", target.options().get("bucket"))
+      Assertions.assertEquals(targetLocation, target.location().toString)
+    }
+  }
+
+  test("Create table like clauses with catalog-qualified identifiers") {
+    assume(gteqSpark3_4)
+    withTable("source_tbl", "target_tbl") {
+      createSourceTable()
+
+      sql(s"""
+             |CREATE TABLE paimon.$dbName0.target_tbl
+             |LIKE paimon.$dbName0.source_tbl
+             |USING paimon
+             |TBLPROPERTIES (
+             |  'bucket' = '8',
+             |  'target-file-size' = '256MB'
+             |)
+             |""".stripMargin)
+
+      val source = loadTable("source_tbl")
+      val target = loadTable("target_tbl")
+      Assertions.assertEquals(spark.table("source_tbl").schema, 
spark.table("target_tbl").schema)
+      Assertions.assertEquals("source comment", target.comment().get())
+      Assertions.assertEquals(List("pt"), 
target.partitionKeys().asScala.toList)
+      Assertions.assertEquals(List("id", "pt"), 
target.primaryKeys().asScala.toList)
+      Assertions.assertEquals("8", target.options().get("bucket"))
+      Assertions.assertEquals("256MB", 
target.options().get("target-file-size"))
+      Assertions.assertNotEquals(source.location().toString, 
target.location().toString)
+    }
+  }
+
+  test("Create table like stored as is unsupported with catalog-qualified 
identifiers") {
+    assume(gteqSpark3_4)
+    withTable("source_tbl", "target_tbl") {
+      sql("CREATE TABLE source_tbl (id INT)")
+
+      val error = intercept[Exception] {
+        sql(s"""
+               |CREATE TABLE paimon.$dbName0.target_tbl
+               |LIKE paimon.$dbName0.source_tbl
+               |STORED AS PARQUET
+               |""".stripMargin)
+      }.getMessage
+
+      Assertions.assertTrue(
+        error.contains("CREATE TABLE LIKE ... STORED AS is not supported for 
SparkCatalog."))
+    }
+  }
+
+  test("Create table like parser accepts non-reserved and nested identifiers") 
{
+    assume(gteqSpark3_4)
+
+    val nonReservedIdentifierCommand =
+      parseCreateTableLikeCommand("CREATE TABLE paimon.test.tag LIKE 
paimon.test.source_tbl")
+    Assertions.assertEquals("tag", 
nonReservedIdentifierCommand.targetIdent.name())
+    Assertions.assertEquals(Seq("test"), 
nonReservedIdentifierCommand.targetIdent.namespace().toSeq)
+
+    val nestedIdentifierCommand =
+      parseCreateTableLikeCommand(
+        "CREATE TABLE paimon.test.extra.target_tbl LIKE 
paimon.test.extra.source_tbl")
+    Assertions.assertEquals("target_tbl", 
nestedIdentifierCommand.targetIdent.name())
+    Assertions.assertEquals(
+      Seq("test.extra"),
+      nestedIdentifierCommand.targetIdent.namespace().toSeq)
+    Assertions.assertEquals("source_tbl", 
nestedIdentifierCommand.sourceIdent.name())
+    Assertions.assertEquals(
+      Seq("test.extra"),
+      nestedIdentifierCommand.sourceIdent.namespace().toSeq)
+  }
+
+  private def createSourceTable(): Unit = {
+    sql("""
+          |CREATE TABLE source_tbl (
+          |  id BIGINT,
+          |  name STRING COMMENT 'name column',
+          |  pt STRING
+          |) COMMENT 'source comment'
+          |PARTITIONED BY (pt)
+          |TBLPROPERTIES (
+          |  'primary-key' = 'id,pt',
+          |  'bucket' = '2',
+          |  'target-file-size' = '64MB'
+          |)
+          |""".stripMargin)
+  }
+
+  private def assertCreatedLike(tableName: String): Unit = {
+    val target = loadTable(tableName)
+
+    Assertions.assertEquals(spark.table("source_tbl").schema, 
spark.table(tableName).schema)
+    Assertions.assertEquals("source comment", target.comment().get())
+    Assertions.assertEquals(List("pt"), target.partitionKeys().asScala.toList)
+    Assertions.assertEquals(List("id", "pt"), 
target.primaryKeys().asScala.toList)
+    Assertions.assertEquals("2", target.options().get("bucket"))
+    Assertions.assertEquals("64MB", target.options().get("target-file-size"))
+  }
+
+  private def parseCreateTableLikeCommand(sqlText: String): 
PaimonCreateTableLikeCommand = {
+    spark.sessionState.sqlParser.parsePlan(sqlText) match {
+      case command: PaimonCreateTableLikeCommand => command
+      case plan =>
+        throw new AssertionError(
+          s"Expected PaimonCreateTableLikeCommand, but got ${plan.nodeName}.")
+    }
+  }
+}

Reply via email to