I think we might need to improve the javadoc of
tableEnv.registerTableSource/registerTableSink.
Currently, the comment says

"Registers an external TableSink with already configured field names and
field types in this TableEnvironment's catalog."

But, what catalog? The current one or default in-memory one?
I think, it would be better to improve the description and add a NOTE on
it.

Regards,
Jark

On Tue, 13 Aug 2019 at 10:52, Xuefu Z <usxu...@gmail.com> wrote:

> Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
> To create table in your custom catalog, you could use
> tableEnv.sqlUpdate("create table ....").
>
> Thanks,
> Xuefu
>
> On Mon, Aug 12, 2019 at 6:17 PM Simon Su <barley...@163.com> wrote:
>
> > Hi Xuefu
> >
> > Thanks for you reply.
> >
> > Actually I have tried it as your advises. I have tried to call
> > tableEnv.useCatalog and useDatabase. Also I have tried to use
> > “catalogname.databasename.tableName”  in SQL. I think the root cause is
> > that when I call tableEnv.registerTableSource, it’s always use a
> “build-in”
> > Catalog and Database rather than the custom one. So if I want to use a
> > custom one, I have to write code like this:
> >
> > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> >     EnvironmentSettings.newInstance()
> >         .useBlinkPlanner()
> >         .inStreamingMode()
> >         .withBuiltInCatalogName("ca1")
> >         .withBuiltInDatabaseName("db1")
> >         .build());
> >
> >
> > As Dawid said, if I want to store in my custom catalog, I can call
> > catalog.createTable or using DDL.
> >
> > Thanks,
> > SImon
> >
> > On 08/13/2019 02:55,Xuefu Z<usxu...@gmail.com> <usxu...@gmail.com>
> wrote:
> >
> > Hi Simon,
> >
> > Thanks for reporting the problem. There is some rough edges around
> catalog
> > API and table environments, and we are improving post 1.9 release.
> >
> > Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> > Flink's CatalogManager, It doens't change the default catalog/database as
> > you expected. To switch to your newly registered catalog, you could call
> > tableEnv.useCatalog() and .useDatabase().
> >
> > As an alternative, you could fully qualify your table name with a
> > "catalog.db.table" syntax without switching current catalog/database.
> >
> > Please try those and let me know if you find new problems.
> >
> > Thanks,
> > Xuefu
> >
> >
> >
> > On Mon, Aug 12, 2019 at 12:38 AM Simon Su <barley...@163.com> wrote:
> >
> >> Hi All
> >>     I want to use a custom catalog by setting the name “ca1” and create
> a
> >> database under this catalog. When I submit the
> >> SQL, and it raises the error like :
> >>
> >>
> >>     Exception in thread "main"
> >> org.apache.flink.table.api.ValidationException: SQL validation failed.
> From
> >> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
> >> within 'ca1.db1'
> >> at
> >>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
> >> at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
> >> at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
> >> at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
> >> at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
> >> at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
> >> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
> >> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
> >> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> >> 1, column 98 to line 1, column 116: Object 'orderstream' not found
> within
> >> 'ca1.db1'
> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >> at
> >>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> >> at
> >>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >> at
> >>
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
> >> at
> >>
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
> >> at
> >>
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
> >> at
> >>
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
> >> at
> >>
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> >> at
> >>
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
> >> at
> >>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
> >> ... 7 more
> >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
> >> 'orderstream' not found within 'ca1.db1'
> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >> at
> >>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> >> at
> >>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >> at
> >>
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> >> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
> >> ... 26 more
> >>
> >> It seems that Calcite cannot find the source object as expected, After I
> >> debug the code I found that when using tableEnv.registerTableSource or
> >> registerTableSink, It will use a build-in catalog with a hard-code
> catalog
> >> name ( default-catalog ) and database name ( default_database ) while
> >> tableEnv.registerCatalog here cannot change this behaviros, So is this a
> >> reasonable behaviors ? If I don’t want to use default build-in catalog
> and
> >> database, is there any other ways to do this ?
> >>
> >>
> >>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
> >> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
> >> change build-in catalog !!
> >> tableEnv.useCatalog(catalog.getName());
> >> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
> >> "comment"), true);
> >> tableEnv.useDatabase("db1");
> >>
> >> tableEnv.connect(sourceKafka)
> >> .withFormat(csv)
> >> .withSchema(schema2)
> >> .inAppendMode()
> >> .registerTableSource("orderstream");
> >>
> >> tableEnv.connect(sinkKafka)
> >> .withFormat(csv)
> >> .withSchema(schema2)
> >> .inAppendMode()
> >> .registerTableSink("sinkstream");;
> >>
> >> String sql = "insert into ca1.db1.sinkstream " +
> >> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
> >> ca1.db1.orderstream " +
> >> "group by tumble(ts, INTERVAL '5' SECOND), data";
> >>
> >> tableEnv.sqlUpdate(sql);
> >> tableEnv.execute("test");
> >>
> >>
> >> Thanks,
> >> SImon
> >>
> >>
> >
> > --
> > Xuefu Zhang
> >
> > "In Honey We Trust!"
> >
> >
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>

Reply via email to