[
https://issues.apache.org/jira/browse/HUDI-5058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
waywtdcc updated HUDI-5058:
---------------------------
Description:
The primary key cannot be empty when Flink reads an error from the hudi table.
Spark SQL is used to create tables and Spark writes data. Then the flash
reports an error when reading.
spark sql write ,USE HUDI 0.11.1
{code:java}
create table test_hudi_cc16 (
id bigint,
name string,
name2 string,
ts bigint
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id' );
insert into test_hudi_cc16
values
(1, 'cc2', 'cc32',12);
{code}
flink read, use hudi 0.12.1:
{code:java}
CREATE CATALOG myhudi WITH(
'type' = 'hudi',
'default-database' = 'test_hudi1',
'catalog.path' = '/user/hdpu/warehouse',
'mode' = 'hms',
'hive.conf.dir' = 'hdfs:///user/hdpu/streamx/conf_data/hive_conf'
)
select *
from myhudi.test_hudi6.test_hudi_cc16; {code}
error:
{code:java}
org.apache.flink.table.api.ValidationException: Invalid primary key 'PK_id'.
Column 'id' is nullable. at
org.apache.flink.table.catalog.DefaultSchemaResolver.validatePrimaryKey(DefaultSchemaResolver.java:352)
at
org.apache.flink.table.catalog.DefaultSchemaResolver.resolvePrimaryKey(DefaultSchemaResolver.java:312)
at
org.apache.flink.table.catalog.DefaultSchemaResolver.resolve(DefaultSchemaResolver.java:88)
at org.apache.flink.table.api.Schema.resolve(Schema.java:123)
at
org.apache.flink.table.catalog.CatalogManager.resolveCatalogTable(CatalogManager.java:877)
at
org.apache.flink.table.catalog.CatalogManager.resolveCatalogBaseTable(CatalogManager.java:863)
at
org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:426)
at
org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:395)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1061)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:649)
at
org.grg_banking.flink.sqlexecute.FlinkUtils.exeucteSqlFile2(FlinkUtils.java:260)
at org.apache.flink.catalog.test.TestCatalog.test1(TestCatalog.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Process finished with exit code -1
{code}
was:
The primary key cannot be empty when Flink reads an error from the hudi table.
Spark SQL is used to create tables and Spark writes data. Then the flash
reports an error when reading.
spark sql
{code:java}
create table test_hudi_cc16 (
id bigint,
name string,
name2 string,
ts bigint
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id' );
insert into test_hudi_cc16
values
(1, 'cc2', 'cc32',12);
{code}
flink read:
{code:java}
CREATE CATALOG myhudi WITH(
'type' = 'hudi',
'default-database' = 'test_hudi1',
'catalog.path' = '/user/hdpu/warehouse',
'mode' = 'hms',
'hive.conf.dir' = 'hdfs:///user/hdpu/streamx/conf_data/hive_conf'
)
select *
from myhudi.test_hudi6.test_hudi_cc16; {code}
error:
{code:java}
org.apache.flink.table.api.ValidationException: Invalid primary key 'PK_id'.
Column 'id' is nullable. at
org.apache.flink.table.catalog.DefaultSchemaResolver.validatePrimaryKey(DefaultSchemaResolver.java:352)
at
org.apache.flink.table.catalog.DefaultSchemaResolver.resolvePrimaryKey(DefaultSchemaResolver.java:312)
at
org.apache.flink.table.catalog.DefaultSchemaResolver.resolve(DefaultSchemaResolver.java:88)
at org.apache.flink.table.api.Schema.resolve(Schema.java:123)
at
org.apache.flink.table.catalog.CatalogManager.resolveCatalogTable(CatalogManager.java:877)
at
org.apache.flink.table.catalog.CatalogManager.resolveCatalogBaseTable(CatalogManager.java:863)
at
org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:426)
at
org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:395)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1061)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:649)
at
org.grg_banking.flink.sqlexecute.FlinkUtils.exeucteSqlFile2(FlinkUtils.java:260)
at org.apache.flink.catalog.test.TestCatalog.test1(TestCatalog.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Process finished with exit code -1
{code}
> The primary key cannot be empty when Flink reads an error from the hudi table
> -----------------------------------------------------------------------------
>
> Key: HUDI-5058
> URL: https://issues.apache.org/jira/browse/HUDI-5058
> Project: Apache Hudi
> Issue Type: Bug
> Components: flink
> Affects Versions: 0.12.1
> Reporter: waywtdcc
> Assignee: waywtdcc
> Priority: Major
> Labels: pull-request-available
> Fix For: 0.12.2
>
>
> The primary key cannot be empty when Flink reads an error from the hudi
> table. Spark SQL is used to create tables and Spark writes data. Then the
> flash reports an error when reading.
> spark sql write ,USE HUDI 0.11.1
>
>
> {code:java}
> create table test_hudi_cc16 (
> id bigint,
> name string,
> name2 string,
> ts bigint
> ) using hudi
> tblproperties (
> type = 'cow',
> primaryKey = 'id' );
> insert into test_hudi_cc16
> values
> (1, 'cc2', 'cc32',12);
> {code}
>
> flink read, use hudi 0.12.1:
>
> {code:java}
> CREATE CATALOG myhudi WITH(
> 'type' = 'hudi',
> 'default-database' = 'test_hudi1',
> 'catalog.path' = '/user/hdpu/warehouse',
> 'mode' = 'hms',
> 'hive.conf.dir' = 'hdfs:///user/hdpu/streamx/conf_data/hive_conf'
> )
> select *
> from myhudi.test_hudi6.test_hudi_cc16; {code}
>
> error:
>
> {code:java}
> org.apache.flink.table.api.ValidationException: Invalid primary key 'PK_id'.
> Column 'id' is nullable. at
> org.apache.flink.table.catalog.DefaultSchemaResolver.validatePrimaryKey(DefaultSchemaResolver.java:352)
> at
> org.apache.flink.table.catalog.DefaultSchemaResolver.resolvePrimaryKey(DefaultSchemaResolver.java:312)
> at
> org.apache.flink.table.catalog.DefaultSchemaResolver.resolve(DefaultSchemaResolver.java:88)
> at org.apache.flink.table.api.Schema.resolve(Schema.java:123)
> at
> org.apache.flink.table.catalog.CatalogManager.resolveCatalogTable(CatalogManager.java:877)
> at
> org.apache.flink.table.catalog.CatalogManager.resolveCatalogBaseTable(CatalogManager.java:863)
> at
> org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:426)
> at
> org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:395)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1061)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:649)
> at
> org.grg_banking.flink.sqlexecute.FlinkUtils.exeucteSqlFile2(FlinkUtils.java:260)
> at org.apache.flink.catalog.test.TestCatalog.test1(TestCatalog.java:49)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
> Process finished with exit code -1
> {code}
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)