This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 59143c1b2 [hive] HiveCatalog support branch (#3820)
59143c1b2 is described below
commit 59143c1b28074b211408db3fce97a74e8f5d18eb
Author: HeavenZH <[email protected]>
AuthorDate: Thu Aug 1 15:38:08 2024 +0800
[hive] HiveCatalog support branch (#3820)
This closes #3820.
---------
Co-authored-by: tsreaper <[email protected]>
---
.../paimon/table/FallbackReadFileStoreTable.java | 17 +++++-
.../java/org/apache/paimon/hive/HiveCatalog.java | 14 +++--
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 62 ++++++++++++++++++++++
3 files changed, 87 insertions(+), 6 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index d26ce955a..0cd991b7f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Preconditions;
@@ -68,7 +69,7 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
RowType mainRowType = main.schema().logicalRowType();
RowType fallbackRowType = fallback.schema().logicalRowType();
Preconditions.checkArgument(
- mainRowType.equals(fallbackRowType),
+ sameRowTypeIgnoreNullable(mainRowType, fallbackRowType),
"Branch %s and %s does not have the same row type.\n"
+ "Row type of branch %s is %s.\n"
+ "Row type of branch %s is %s.",
@@ -104,6 +105,20 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
}
}
+ private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType
fallbackRowType) {
+ if (mainRowType.getFieldCount() != fallbackRowType.getFieldCount()) {
+ return false;
+ }
+ for (int i = 0; i < mainRowType.getFieldCount(); i++) {
+ DataType mainType = mainRowType.getFields().get(i).type();
+ DataType fallbackType = fallbackRowType.getFields().get(i).type();
+ if (!mainType.equalsIgnoreNullable(fallbackType)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
@Override
public FileStoreTable copy(Map<String, String> dynamicOptions) {
return new FallbackReadFileStoreTable(
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 667311ad4..4ed7c54d8 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -91,6 +91,7 @@ import static
org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
import static
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
@@ -392,8 +393,6 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public TableSchema getDataTableSchema(Identifier identifier) throws
TableNotExistException {
- assertMainBranch(identifier);
-
if (!tableExists(identifier)) {
throw new TableNotExistException(identifier);
}
@@ -529,12 +528,14 @@ public class HiveCatalog extends AbstractCatalog {
@Override
protected void alterTableImpl(Identifier identifier, List<SchemaChange>
changes)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
- assertMainBranch(identifier);
-
final SchemaManager schemaManager = schemaManager(identifier);
// first commit changes to underlying files
TableSchema schema = schemaManager.commitChanges(changes);
+ // currently only changes to main branch affects metastore
+ if (!DEFAULT_MAIN_BRANCH.equals(identifier.getBranchNameOrDefault())) {
+ return;
+ }
try {
Table table =
clients.run(
@@ -777,7 +778,10 @@ public class HiveCatalog extends AbstractCatalog {
}
private SchemaManager schemaManager(Identifier identifier) {
- return new SchemaManager(fileIO, getDataTableLocation(identifier))
+ return new SchemaManager(
+ fileIO,
+ getDataTableLocation(identifier),
+ identifier.getBranchNameOrDefault())
.withLock(lock(identifier));
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index d400ec40b..e0e19b9ba 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -463,6 +463,58 @@ public abstract class HiveCatalogITCaseBase {
"Cannot find table '`my_hive`.`test_db`.`hive_table`'
in any of the catalogs [default_catalog, my_hive], nor as a temporary table.");
}
+ @Test
+ public void testFlinkCreateBranchAndHiveRead() throws Exception {
+ tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH (
'file.format' = 'avro' )")
+ .await();
+ tEnv.executeSql("CALL sys.create_branch('test_db.t', 'test')").await();
+ tEnv.executeSql("ALTER TABLE `t$branch_test` SET ( 'primary-key' =
'a', 'bucket' = '1' )")
+ .await();
+ tEnv.executeSql("ALTER TABLE `t$branch_test` ADD (c INT)").await();
+
+ tEnv.executeSql("INSERT INTO `t$branch_test` VALUES (1, 'x1', 10), (2,
'x2', 20)").await();
+ tEnv.executeSql("INSERT INTO t VALUES (3, 'x3'), (4, 'x4')").await();
+ tEnv.executeSql("INSERT INTO `t$branch_test` VALUES (1, 'x11',
11)").await();
+ tEnv.executeSql("INSERT INTO t VALUES (3, 'x33')").await();
+
+ assertThat(collect("SELECT * FROM t"))
+ .containsExactlyInAnyOrder(Row.of(3, "x3"), Row.of(3, "x33"),
Row.of(4, "x4"));
+ assertThat(collect("SELECT * FROM `t$branch_test`"))
+ .containsExactlyInAnyOrder(Row.of(1, "x11", 11), Row.of(2,
"x2", 20));
+ assertThat(hiveShell.executeQuery("SELECT * FROM t"))
+ .containsExactlyInAnyOrder("3\tx3", "3\tx33", "4\tx4");
+ }
+
+ @Test
+ public void testFallbackBranchRead() throws Exception {
+ tEnv.executeSql(
+ "CREATE TABLE t ( pt INT, a INT, b STRING )
PARTITIONED BY (pt) "
+ + "WITH ( 'file.format' = 'avro' )")
+ .await();
+ tEnv.executeSql("CALL sys.create_branch('test_db.t', 'test')").await();
+ tEnv.executeSql(
+ "ALTER TABLE `t$branch_test` SET ( 'primary-key' =
'pt, a', 'bucket' = '1' )")
+ .await();
+ tEnv.executeSql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'test'
)").await();
+
+ tEnv.executeSql(
+ "INSERT INTO `t$branch_test` VALUES "
+ + "(1, 20, 'cat'), (1, 30, 'dog'), (2, 10,
'tiger'), (2, 20, 'wolf')")
+ .await();
+ tEnv.executeSql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20,
'banana')").await();
+ tEnv.executeSql("INSERT INTO `t$branch_test` VALUES (2, 10,
'lion')").await();
+
+ assertThat(collect("SELECT * FROM t"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 10, "apple"),
+ Row.of(1, 20, "banana"),
+ Row.of(2, 10, "lion"),
+ Row.of(2, 20, "wolf"));
+ assertThat(hiveShell.executeQuery("SELECT * FROM t"))
+ .containsExactlyInAnyOrder(
+ "1\t10\tapple", "1\t20\tbanana", "2\t10\tlion",
"2\t20\twolf");
+ }
+
/**
* Test flink writing and hive reading to compare partitions and
non-partitions table results.
*/
@@ -1467,4 +1519,14 @@ public abstract class HiveCatalogITCaseBase {
}
return result;
}
+
+ private List<String> collectString(String sql) throws Exception {
+ List<String> result = new ArrayList<>();
+ try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
+ while (it.hasNext()) {
+ result.add(it.next().toString());
+ }
+ }
+ return result;
+ }
}