Hello Timo,

I have implemented my own scalar function as below

public class AccessBasicDBObject extends ScalarFunction {

    public String eval(String key, BasicDBObject basicDBObject) {
        if (basicDBObject.getString(key) != null)
            return basicDBObject.getString(key);
        else return "";
    }

    @Override
    public TypeInformation<?> getResultType(Class<?>[] signature) {
        return Types.STRING;
    }
}

Table master = table1.filter("ns === 'Master'").select("o as master,
'accessBasicDBObject(applicationId,o)' as primaryKey");
Table child1 = table1.filter("ns === 'Child1'").select("o  as child1,
'accessBasicDBObject(applicationId,o)' as foreignKey");
Table child2 = table1.filter("ns === 'Child2'").select("o  as child2,
'accessBasicDBObject(applicationId,o)' as foreignKey2");

Table result = 
master.join(child1).where("primaryKey==foreignKey").join(child2).where("primaryKey==foreignKey2");

and it is giving me following error on line DataStream<Row> rowDataStream =
tableEnv.toDataStream(result, Row.class);

Exception in thread "Thread-27" org.apache.flink.table.api.TableException:
Cannot generate a valid execution plan for the given query:

LogicalJoin(condition=[true], joinType=[inner])
  LogicalJoin(condition=[true], joinType=[inner])
    LogicalProject(master=[$2], primaryKey=[_UTF-16LE'accessBasicDBObject(
loanApplicationId,o)'])
      LogicalFilter(condition=[=($1, _UTF-16LE'analyticDB.
customerMISMaster')])
        LogicalTableScan(table=[[_DataStreamTable_0]])
    LogicalProject(child1=[$2], foreignKey=[_UTF-16LE'accessBasicDBObject(
loanApplicationId,o)'])
      LogicalFilter(condition=[=($1, _UTF-16LE'analyticDB.
customerMISChild1')])
        LogicalTableScan(table=[[_DataStreamTable_0]])
  LogicalProject(child2=[$2], foreignKey2=[_UTF-16LE'accessBasicDBObject(
loanApplicationId,o)'])
    LogicalFilter(condition=[=($1, _UTF-16LE'analyticDB.
customerMISChild2')])
      LogicalTableScan(table=[[_DataStreamTable_0]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL
features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(
TableEnvironment.scala:274)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(
StreamTableEnvironment.scala:722)
at org.apache.flink.table.api.StreamTableEnvironment.translate(
StreamTableEnvironment.scala:778)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(
StreamTableEnvironment.scala:308)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(
StreamTableEnvironment.scala:262)
at org.apache.flink.table.api.java.StreamTableEnvironment.toDataStream(
StreamTableEnvironment.scala:159)
at com.softcell.streaming.flink.StreamingOperations$2.run(
StreamingOperations.java:168)

-----------------------------------------------
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com <sac...@iprogrammer.com>
------------------------------------------------

On Fri, Jul 27, 2018 at 3:13 PM, Timo Walther <twal...@apache.org> wrote:

> Hi,
>
> I think the exception is self-explaining. BasicDBObject is not recognized
> as a POJO by Flink. A POJO is required such that the Table API knows the
> types of fields for following operations.
>
> The easiest way is to implement your own scalar function. E.g. a
> `accessBasicDBObject(obj, key)`.
>
> Regards,
> Timo
>
>
> Am 27.07.18 um 11:25 schrieb Amol S - iProgrammer:
>
>> Hello Timo,
>>
>> Thanks for quick reply. By using your suggestion Previous exception gone
>> but it is giving me following exception
>>
>> Expression 'o.get(_id) failed on input check: Cannot access field of
>> non-composite type 'GenericType<com.mongodb.BasicDBObject>'.
>>
>> -----------------------------------------------
>> *Amol Suryawanshi*
>> Java Developer
>> am...@iprogrammer.com
>>
>>
>> *iProgrammer Solutions Pvt. Ltd.*
>>
>>
>>
>> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
>> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune -
>> 411016,
>> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
>> www.iprogrammer.com <sac...@iprogrammer.com>
>> ------------------------------------------------
>>
>> On Fri, Jul 27, 2018 at 1:08 PM, Timo Walther <twal...@apache.org> wrote:
>>
>> Hi Amol,
>>>
>>> the dot operation is reserved for calling functions on fields. If you
>>> want
>>> to get a nested field in the Table API, use the `.get("applicationId")`
>>> operation. See also [1] under "Value access functions".
>>>
>>> Regards,
>>> Timo
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/
>>> dev/table/tableApi.html#built-in-functions
>>>
>>>
>>> Am 27.07.18 um 09:10 schrieb Amol S - iProgrammer:
>>>
>>> Hello Fabian,
>>>>
>>>> I am streaming my mongodb oplog using flink and want to use flink table
>>>> API
>>>> to join multiple tables.  My code looks like
>>>>
>>>> DataStream<Oplog> streamSource = env
>>>>           .addSource(kafkaConsumer)
>>>>           .setParallelism(4);
>>>>
>>>> StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvir
>>>> onment(env);
>>>> // Convert the DataStream into a Table with default fields "f0", "f1"
>>>> Table table1 = tableEnv.fromDataStream(streamSource);
>>>>
>>>> Table master = table1.filter("ns === 'Master'").select("o as master,
>>>> o.applicationId as primaryKey");
>>>> Table child1 = table1.filter("ns === 'Child1'").select("o  as child1,
>>>> o.applicationId as foreignKey");
>>>> Table child2 = table1.filter("ns === 'Child2'").select("o  as child2,
>>>> o.applicationId as foreignKey2");
>>>>
>>>> Table result = master.join(child1).where("pri
>>>> maryKey==foreignKey").join(child2).where("primaryKey==foreignKey2");
>>>>
>>>> it is throwing error "Method threw
>>>> 'org.apache.flink.table.api.ValidationException' exception. Undefined
>>>> function: APPLICATIONID"
>>>>
>>>> public class Oplog implements Serializable{
>>>>       private BasicDBObject o;
>>>> }
>>>>
>>>> Where o is generic java type for fetching mongodb oplog and I can not
>>>> replace this generic type with static pojo's. please tell me any work
>>>> around on this.
>>>>
>>>> BasicDBObject suffice following two rules
>>>>
>>>>
>>>>      -
>>>>
>>>>      The class must be public.
>>>>      -
>>>>
>>>>      It must have a public constructor without arguments (default
>>>> constructor)
>>>>
>>>> and we can access class members through basicDBObject.getString("abc")
>>>>
>>>>
>>>>
>>>>
>>>> -----------------------------------------------
>>>> *Amol Suryawanshi*
>>>> Java Developer
>>>> am...@iprogrammer.com
>>>>
>>>>
>>>> *iProgrammer Solutions Pvt. Ltd.*
>>>>
>>>>
>>>>
>>>> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
>>>> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune -
>>>> 411016,
>>>> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
>>>> www.iprogrammer.com <sac...@iprogrammer.com>
>>>> ------------------------------------------------
>>>>
>>>>
>>>>
>

Reply via email to