[ 
https://issues.apache.org/jira/browse/FLINK-19655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

seunjjs updated FLINK-19655:
----------------------------
    Description: 
My Code here:

{code:java}
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
bsSettings);
tableEnv.getConfig().setIdleStateRetentionTime(Time.seconds(60), 
Time.seconds(600));

final Table table = tableEnv.from("tableName");
final TableFunction<?> function = table.createTemporalTableFunction(
                    temporalTableEntry.getTimeAttribute(),
                    String.join(",", temporalTableEntry.getPrimaryKeyFields()));
tableEnv.registerFunction(temporalTableEntry.getName(), function);
{code}



And NPE throwed when I executed my program.
{code:java}
java.lang.NullPointerException
        at 
org.apache.flink.table.runtime.operators.join.temporal.BaseTwoInputStreamOperatorWithStateRetention.registerProcessingCleanupTimer(BaseTwoInputStreamOperatorWithStateRetention.java:109)
        at 
org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator.processElement2(TemporalProcessTimeJoinOperator.java:98)
        at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord2(StreamTwoInputProcessor.java:145)
        at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$1(StreamTwoInputProcessor.java:107)
        at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:362)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
        at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:185)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
        at java.lang.Thread.run(Thread.java:748)
{code}
    
And When I changed to useOldPlanner, it worked fine.And when I debuged the code 
,I found BaseTwoInputStreamOperatorWithStateRetention#open did not be executed.
Here is BaseTwoInputStreamOperatorWithStateRetention#open code.
{code:java}
public void open() throws Exception {
                initializeTimerService();

                if (stateCleaningEnabled) {
                        ValueStateDescriptor<Long> cleanupStateDescriptor =
                                new ValueStateDescriptor<>(CLEANUP_TIMESTAMP, 
Types.LONG);
                        latestRegisteredCleanupTimer = 
getRuntimeContext().getState(cleanupStateDescriptor);
                }
        }
{code}
Here is TemporalProcessTimeJoinOperator#open code.
{code:java}
public void open() throws Exception {
                this.joinCondition = 
generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
                FunctionUtils.setFunctionRuntimeContext(joinCondition, 
getRuntimeContext());
                FunctionUtils.openFunction(joinCondition, new Configuration());

                ValueStateDescriptor<BaseRow> rightStateDesc = new 
ValueStateDescriptor<>("right", rightType);
                this.rightState = getRuntimeContext().getState(rightStateDesc);
                this.collector = new TimestampedCollector<>(output);
                this.outRow = new JoinedRow();
                // consider watermark from left stream only.
                super.processWatermark2(Watermark.MAX_WATERMARK);
        }
{code}
I compared the code with oldplaner(TemporalProcessTimeJoin#open).May be 
TemporalProcessTimeJoinOperator#open should add super.open()?
Here is TemporalProcessTimeJoin#open code.
{code:scala}
override def open(): Unit = {
    LOG.debug(s"Compiling FlatJoinFunction: $genJoinFuncName \n\n 
Code:\n$genJoinFuncCode")
    val clazz = compile(
      getRuntimeContext.getUserCodeClassLoader,
      genJoinFuncName,
      genJoinFuncCode)

    LOG.debug("Instantiating FlatJoinFunction.")
    joinFunction = clazz.newInstance()
    FunctionUtils.setFunctionRuntimeContext(joinFunction, getRuntimeContext)
    FunctionUtils.openFunction(joinFunction, new Configuration())

    val rightStateDescriptor = new ValueStateDescriptor[Row]("right", rightType)
    rightState = getRuntimeContext.getState(rightStateDescriptor)

    collector = new TimestampedCollector[CRow](output)
    cRowWrapper = new CRowWrappingCollector()
    cRowWrapper.out = collector

    super.open()
  }
{code}

  was:
My Code here:

{code:java}
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
bsSettings);
tableEnv.getConfig().setIdleStateRetentionTime(Time.seconds(60), 
Time.seconds(600));

final Table table = tableEnv.from("tableName");
final TableFunction<?> function = table.createTemporalTableFunction(
                    temporalTableEntry.getTimeAttribute(),
                    String.join(",", temporalTableEntry.getPrimaryKeyFields()));
tableEnv.registerFunction(temporalTableEntry.getName(), function);
{code}



And NPE throwed when I executed my program.
{code:java}
java.lang.NullPointerException
        at 
org.apache.flink.table.runtime.operators.join.temporal.BaseTwoInputStreamOperatorWithStateRetention.registerProcessingCleanupTimer(BaseTwoInputStreamOperatorWithStateRetention.java:109)
        at 
org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator.processElement2(TemporalProcessTimeJoinOperator.java:98)
        at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord2(StreamTwoInputProcessor.java:145)
        at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$1(StreamTwoInputProcessor.java:107)
        at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:362)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
        at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:185)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
        at java.lang.Thread.run(Thread.java:748)
{code}
    
And When I changed to useOldPlanner, it worked fine.And when I debuged the code 
,I found BaseTwoInputStreamOperatorWithStateRetention#open did not be executed.
Here is BaseTwoInputStreamOperatorWithStateRetention#open code.
{code:java}
public void open() throws Exception {
                initializeTimerService();

                if (stateCleaningEnabled) {
                        ValueStateDescriptor<Long> cleanupStateDescriptor =
                                new ValueStateDescriptor<>(CLEANUP_TIMESTAMP, 
Types.LONG);
                        latestRegisteredCleanupTimer = 
getRuntimeContext().getState(cleanupStateDescriptor);
                }
        }
{code}
Here is TemporalProcessTimeJoinOperator#open code.
{code:java}
public void open() throws Exception {
                this.joinCondition = 
generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
                FunctionUtils.setFunctionRuntimeContext(joinCondition, 
getRuntimeContext());
                FunctionUtils.openFunction(joinCondition, new Configuration());

                ValueStateDescriptor<BaseRow> rightStateDesc = new 
ValueStateDescriptor<>("right", rightType);
                this.rightState = getRuntimeContext().getState(rightStateDesc);
                this.collector = new TimestampedCollector<>(output);
                this.outRow = new JoinedRow();
                // consider watermark from left stream only.
                super.processWatermark2(Watermark.MAX_WATERMARK);
        }
{code}
I compared the code with oldplaner(TemporalProcessTimeJoin#open).May be 
TemporalProcessTimeJoinOperator#open should add super.open()?
Here is TemporalProcessTimeJoin#open code.
{code:java}
override def open(): Unit = {
    LOG.debug(s"Compiling FlatJoinFunction: $genJoinFuncName \n\n 
Code:\n$genJoinFuncCode")
    val clazz = compile(
      getRuntimeContext.getUserCodeClassLoader,
      genJoinFuncName,
      genJoinFuncCode)

    LOG.debug("Instantiating FlatJoinFunction.")
    joinFunction = clazz.newInstance()
    FunctionUtils.setFunctionRuntimeContext(joinFunction, getRuntimeContext)
    FunctionUtils.openFunction(joinFunction, new Configuration())

    val rightStateDescriptor = new ValueStateDescriptor[Row]("right", rightType)
    rightState = getRuntimeContext.getState(rightStateDescriptor)

    collector = new TimestampedCollector[CRow](output)
    cRowWrapper = new CRowWrappingCollector()
    cRowWrapper.out = collector

    super.open()
  }
{code}


> NPE when using blink planner and TemporalTableFunction after setting 
> IdleStateRetentionTime 
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-19655
>                 URL: https://issues.apache.org/jira/browse/FLINK-19655
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.10.0
>            Reporter: seunjjs
>            Priority: Major
>
> My Code here:
> {code:java}
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
> bsSettings);
> tableEnv.getConfig().setIdleStateRetentionTime(Time.seconds(60), 
> Time.seconds(600));
> final Table table = tableEnv.from("tableName");
> final TableFunction<?> function = table.createTemporalTableFunction(
>                     temporalTableEntry.getTimeAttribute(),
>                     String.join(",", 
> temporalTableEntry.getPrimaryKeyFields()));
> tableEnv.registerFunction(temporalTableEntry.getName(), function);
> {code}
> And NPE throwed when I executed my program.
> {code:java}
> java.lang.NullPointerException
>       at 
> org.apache.flink.table.runtime.operators.join.temporal.BaseTwoInputStreamOperatorWithStateRetention.registerProcessingCleanupTimer(BaseTwoInputStreamOperatorWithStateRetention.java:109)
>       at 
> org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator.processElement2(TemporalProcessTimeJoinOperator.java:98)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord2(StreamTwoInputProcessor.java:145)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$1(StreamTwoInputProcessor.java:107)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:362)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:185)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
>     
> And When I changed to useOldPlanner, it worked fine.And when I debuged the 
> code ,I found BaseTwoInputStreamOperatorWithStateRetention#open did not be 
> executed.
> Here is BaseTwoInputStreamOperatorWithStateRetention#open code.
> {code:java}
> public void open() throws Exception {
>               initializeTimerService();
>               if (stateCleaningEnabled) {
>                       ValueStateDescriptor<Long> cleanupStateDescriptor =
>                               new ValueStateDescriptor<>(CLEANUP_TIMESTAMP, 
> Types.LONG);
>                       latestRegisteredCleanupTimer = 
> getRuntimeContext().getState(cleanupStateDescriptor);
>               }
>       }
> {code}
> Here is TemporalProcessTimeJoinOperator#open code.
> {code:java}
> public void open() throws Exception {
>               this.joinCondition = 
> generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
>               FunctionUtils.setFunctionRuntimeContext(joinCondition, 
> getRuntimeContext());
>               FunctionUtils.openFunction(joinCondition, new Configuration());
>               ValueStateDescriptor<BaseRow> rightStateDesc = new 
> ValueStateDescriptor<>("right", rightType);
>               this.rightState = getRuntimeContext().getState(rightStateDesc);
>               this.collector = new TimestampedCollector<>(output);
>               this.outRow = new JoinedRow();
>               // consider watermark from left stream only.
>               super.processWatermark2(Watermark.MAX_WATERMARK);
>       }
> {code}
> I compared the code with oldplaner(TemporalProcessTimeJoin#open).May be 
> TemporalProcessTimeJoinOperator#open should add super.open()?
> Here is TemporalProcessTimeJoin#open code.
> {code:scala}
> override def open(): Unit = {
>     LOG.debug(s"Compiling FlatJoinFunction: $genJoinFuncName \n\n 
> Code:\n$genJoinFuncCode")
>     val clazz = compile(
>       getRuntimeContext.getUserCodeClassLoader,
>       genJoinFuncName,
>       genJoinFuncCode)
>     LOG.debug("Instantiating FlatJoinFunction.")
>     joinFunction = clazz.newInstance()
>     FunctionUtils.setFunctionRuntimeContext(joinFunction, getRuntimeContext)
>     FunctionUtils.openFunction(joinFunction, new Configuration())
>     val rightStateDescriptor = new ValueStateDescriptor[Row]("right", 
> rightType)
>     rightState = getRuntimeContext.getState(rightStateDescriptor)
>     collector = new TimestampedCollector[CRow](output)
>     cRowWrapper = new CRowWrappingCollector()
>     cRowWrapper.out = collector
>     super.open()
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to