This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 59143c1b2 [hive] HiveCatalog support branch (#3820)
59143c1b2 is described below

commit 59143c1b28074b211408db3fce97a74e8f5d18eb
Author: HeavenZH <[email protected]>
AuthorDate: Thu Aug 1 15:38:08 2024 +0800

    [hive] HiveCatalog support branch (#3820)
    
    This closes #3820.
    
    ---------
    
    Co-authored-by: tsreaper <[email protected]>
---
 .../paimon/table/FallbackReadFileStoreTable.java   | 17 +++++-
 .../java/org/apache/paimon/hive/HiveCatalog.java   | 14 +++--
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  | 62 ++++++++++++++++++++++
 3 files changed, 87 insertions(+), 6 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index d26ce955a..0cd991b7f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Preconditions;
@@ -68,7 +69,7 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
         RowType mainRowType = main.schema().logicalRowType();
         RowType fallbackRowType = fallback.schema().logicalRowType();
         Preconditions.checkArgument(
-                mainRowType.equals(fallbackRowType),
+                sameRowTypeIgnoreNullable(mainRowType, fallbackRowType),
                 "Branch %s and %s does not have the same row type.\n"
                         + "Row type of branch %s is %s.\n"
                         + "Row type of branch %s is %s.",
@@ -104,6 +105,20 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
         }
     }
 
+    private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType 
fallbackRowType) {
+        if (mainRowType.getFieldCount() != fallbackRowType.getFieldCount()) {
+            return false;
+        }
+        for (int i = 0; i < mainRowType.getFieldCount(); i++) {
+            DataType mainType = mainRowType.getFields().get(i).type();
+            DataType fallbackType = fallbackRowType.getFields().get(i).type();
+            if (!mainType.equalsIgnoreNullable(fallbackType)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     @Override
     public FileStoreTable copy(Map<String, String> dynamicOptions) {
         return new FallbackReadFileStoreTable(
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 667311ad4..4ed7c54d8 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -91,6 +91,7 @@ import static 
org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
 import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
 import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
 import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
 
@@ -392,8 +393,6 @@ public class HiveCatalog extends AbstractCatalog {
 
     @Override
     public TableSchema getDataTableSchema(Identifier identifier) throws 
TableNotExistException {
-        assertMainBranch(identifier);
-
         if (!tableExists(identifier)) {
             throw new TableNotExistException(identifier);
         }
@@ -529,12 +528,14 @@ public class HiveCatalog extends AbstractCatalog {
     @Override
     protected void alterTableImpl(Identifier identifier, List<SchemaChange> 
changes)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
-        assertMainBranch(identifier);
-
         final SchemaManager schemaManager = schemaManager(identifier);
         // first commit changes to underlying files
         TableSchema schema = schemaManager.commitChanges(changes);
 
+        // currently only changes to main branch affects metastore
+        if (!DEFAULT_MAIN_BRANCH.equals(identifier.getBranchNameOrDefault())) {
+            return;
+        }
         try {
             Table table =
                     clients.run(
@@ -777,7 +778,10 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     private SchemaManager schemaManager(Identifier identifier) {
-        return new SchemaManager(fileIO, getDataTableLocation(identifier))
+        return new SchemaManager(
+                        fileIO,
+                        getDataTableLocation(identifier),
+                        identifier.getBranchNameOrDefault())
                 .withLock(lock(identifier));
     }
 
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index d400ec40b..e0e19b9ba 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -463,6 +463,58 @@ public abstract class HiveCatalogITCaseBase {
                         "Cannot find table '`my_hive`.`test_db`.`hive_table`' 
in any of the catalogs [default_catalog, my_hive], nor as a temporary table.");
     }
 
+    @Test
+    public void testFlinkCreateBranchAndHiveRead() throws Exception {
+        tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH ( 
'file.format' = 'avro' )")
+                .await();
+        tEnv.executeSql("CALL sys.create_branch('test_db.t', 'test')").await();
+        tEnv.executeSql("ALTER TABLE `t$branch_test` SET ( 'primary-key' = 
'a', 'bucket' = '1' )")
+                .await();
+        tEnv.executeSql("ALTER TABLE `t$branch_test` ADD (c INT)").await();
+
+        tEnv.executeSql("INSERT INTO `t$branch_test` VALUES (1, 'x1', 10), (2, 
'x2', 20)").await();
+        tEnv.executeSql("INSERT INTO t VALUES (3, 'x3'), (4, 'x4')").await();
+        tEnv.executeSql("INSERT INTO `t$branch_test` VALUES (1, 'x11', 
11)").await();
+        tEnv.executeSql("INSERT INTO t VALUES (3, 'x33')").await();
+
+        assertThat(collect("SELECT * FROM t"))
+                .containsExactlyInAnyOrder(Row.of(3, "x3"), Row.of(3, "x33"), 
Row.of(4, "x4"));
+        assertThat(collect("SELECT * FROM `t$branch_test`"))
+                .containsExactlyInAnyOrder(Row.of(1, "x11", 11), Row.of(2, 
"x2", 20));
+        assertThat(hiveShell.executeQuery("SELECT * FROM t"))
+                .containsExactlyInAnyOrder("3\tx3", "3\tx33", "4\tx4");
+    }
+
+    @Test
+    public void testFallbackBranchRead() throws Exception {
+        tEnv.executeSql(
+                        "CREATE TABLE t ( pt INT, a INT, b STRING ) 
PARTITIONED BY (pt) "
+                                + "WITH ( 'file.format' = 'avro' )")
+                .await();
+        tEnv.executeSql("CALL sys.create_branch('test_db.t', 'test')").await();
+        tEnv.executeSql(
+                        "ALTER TABLE `t$branch_test` SET ( 'primary-key' = 
'pt, a', 'bucket' = '1' )")
+                .await();
+        tEnv.executeSql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'test' 
)").await();
+
+        tEnv.executeSql(
+                        "INSERT INTO `t$branch_test` VALUES "
+                                + "(1, 20, 'cat'), (1, 30, 'dog'), (2, 10, 
'tiger'), (2, 20, 'wolf')")
+                .await();
+        tEnv.executeSql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 
'banana')").await();
+        tEnv.executeSql("INSERT INTO `t$branch_test` VALUES (2, 10, 
'lion')").await();
+
+        assertThat(collect("SELECT * FROM t"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 10, "apple"),
+                        Row.of(1, 20, "banana"),
+                        Row.of(2, 10, "lion"),
+                        Row.of(2, 20, "wolf"));
+        assertThat(hiveShell.executeQuery("SELECT * FROM t"))
+                .containsExactlyInAnyOrder(
+                        "1\t10\tapple", "1\t20\tbanana", "2\t10\tlion", 
"2\t20\twolf");
+    }
+
     /**
      * Test flink writing and hive reading to compare partitions and 
non-partitions table results.
      */
@@ -1467,4 +1519,14 @@ public abstract class HiveCatalogITCaseBase {
         }
         return result;
     }
+
+    private List<String> collectString(String sql) throws Exception {
+        List<String> result = new ArrayList<>();
+        try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
+            while (it.hasNext()) {
+                result.add(it.next().toString());
+            }
+        }
+        return result;
+    }
 }

Reply via email to