I think we might need to improve the javadoc of tableEnv.registerTableSource/registerTableSink. Currently, the comment says
"Registers an external TableSink with already configured field names and field types in this TableEnvironment's catalog." But, what catalog? The current one or default in-memory one? I think, it would be better to improve the description and add a NOTE on it. Regards, Jark On Tue, 13 Aug 2019 at 10:52, Xuefu Z <usxu...@gmail.com> wrote: > Yes, tableEnv.registerTable(_) etc always registers in the default catalog. > To create table in your custom catalog, you could use > tableEnv.sqlUpdate("create table ...."). > > Thanks, > Xuefu > > On Mon, Aug 12, 2019 at 6:17 PM Simon Su <barley...@163.com> wrote: > > > Hi Xuefu > > > > Thanks for you reply. > > > > Actually I have tried it as your advises. I have tried to call > > tableEnv.useCatalog and useDatabase. Also I have tried to use > > “catalogname.databasename.tableName” in SQL. I think the root cause is > > that when I call tableEnv.registerTableSource, it’s always use a > “build-in” > > Catalog and Database rather than the custom one. So if I want to use a > > custom one, I have to write code like this: > > > > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > > EnvironmentSettings.newInstance() > > .useBlinkPlanner() > > .inStreamingMode() > > .withBuiltInCatalogName("ca1") > > .withBuiltInDatabaseName("db1") > > .build()); > > > > > > As Dawid said, if I want to store in my custom catalog, I can call > > catalog.createTable or using DDL. > > > > Thanks, > > SImon > > > > On 08/13/2019 02:55,Xuefu Z<usxu...@gmail.com> <usxu...@gmail.com> > wrote: > > > > Hi Simon, > > > > Thanks for reporting the problem. There is some rough edges around > catalog > > API and table environments, and we are improving post 1.9 release. > > > > Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in > > Flink's CatalogManager, It doens't change the default catalog/database as > > you expected. To switch to your newly registered catalog, you could call > > tableEnv.useCatalog() and .useDatabase(). > > > > As an alternative, you could fully qualify your table name with a > > "catalog.db.table" syntax without switching current catalog/database. > > > > Please try those and let me know if you find new problems. > > > > Thanks, > > Xuefu > > > > > > > > On Mon, Aug 12, 2019 at 12:38 AM Simon Su <barley...@163.com> wrote: > > > >> Hi All > >> I want to use a custom catalog by setting the name “ca1” and create > a > >> database under this catalog. When I submit the > >> SQL, and it raises the error like : > >> > >> > >> Exception in thread "main" > >> org.apache.flink.table.api.ValidationException: SQL validation failed. > From > >> line 1, column 98 to line 1, column 116: Object 'orderstream' not found > >> within 'ca1.db1' > >> at > >> > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125) > >> at > >> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82) > >> at > >> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154) > >> at > >> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89) > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130) > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335) > >> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126) > >> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137) > >> Caused by: org.apache.calcite.runtime.CalciteContextException: From line > >> 1, column 98 to line 1, column 116: Object 'orderstream' not found > within > >> 'ca1.db1' > >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > >> at > >> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > >> at > >> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > >> at > >> > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805) > >> at > >> > org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166) > >> at > >> > org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177) > >> at > >> > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363) > >> at > >> > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > >> at > >> > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) > >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637) > >> at > >> > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122) > >> ... 7 more > >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object > >> 'orderstream' not found within 'ca1.db1' > >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > >> at > >> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > >> at > >> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > >> at > >> > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > >> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) > >> ... 26 more > >> > >> It seems that Calcite cannot find the source object as expected, After I > >> debug the code I found that when using tableEnv.registerTableSource or > >> registerTableSink, It will use a build-in catalog with a hard-code > catalog > >> name ( default-catalog ) and database name ( default_database ) while > >> tableEnv.registerCatalog here cannot change this behaviros, So is this a > >> reasonable behaviors ? If I don’t want to use default build-in catalog > and > >> database, is there any other ways to do this ? > >> > >> > >> GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1"); > >> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to > >> change build-in catalog !! > >> tableEnv.useCatalog(catalog.getName()); > >> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), > >> "comment"), true); > >> tableEnv.useDatabase("db1"); > >> > >> tableEnv.connect(sourceKafka) > >> .withFormat(csv) > >> .withSchema(schema2) > >> .inAppendMode() > >> .registerTableSource("orderstream"); > >> > >> tableEnv.connect(sinkKafka) > >> .withFormat(csv) > >> .withSchema(schema2) > >> .inAppendMode() > >> .registerTableSink("sinkstream");; > >> > >> String sql = "insert into ca1.db1.sinkstream " + > >> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from > >> ca1.db1.orderstream " + > >> "group by tumble(ts, INTERVAL '5' SECOND), data"; > >> > >> tableEnv.sqlUpdate(sql); > >> tableEnv.execute("test"); > >> > >> > >> Thanks, > >> SImon > >> > >> > > > > -- > > Xuefu Zhang > > > > "In Honey We Trust!" > > > > > > -- > Xuefu Zhang > > "In Honey We Trust!" >