This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e78eca4  [FLINK-20961][table-planner-blink] Fix NPE when no assigned 
timestamp defined in DataStream
e78eca4 is described below

commit e78eca47d2e30b246ddba13faa1f50acdd53f33f
Author: Yuval Itzchakov <[email protected]>
AuthorDate: Mon Jan 25 16:03:58 2021 +0200

    [FLINK-20961][table-planner-blink] Fix NPE when no assigned timestamp 
defined in DataStream
    
    This closes #14735
---
 .../table/planner/codegen/GenerateUtils.scala      | 16 ++--
 .../stream/table/TimeAttributesITCase.scala        | 91 ++++++++++++++++++++++
 2 files changed, 100 insertions(+), 7 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
index 5280203..6d51de3 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
@@ -512,18 +512,20 @@ object GenerateUtils {
       contextTerm: String): GeneratedExpression = {
     val resultType = new TimestampType(true, TimestampKind.ROWTIME, 3)
     val resultTypeTerm = primitiveTypeTermForType(resultType)
-    val Seq(resultTerm, nullTerm) = ctx.addReusableLocalVariables(
+    val Seq(resultTerm, nullTerm, timestamp) = ctx.addReusableLocalVariables(
       (resultTypeTerm, "result"),
-      ("boolean", "isNull"))
+      ("boolean", "isNull"),
+      ("Long", "timestamp"))
 
     val accessCode =
       s"""
-         |$resultTerm = 
$TIMESTAMP_DATA.fromEpochMillis($contextTerm.timestamp());
-         |if ($resultTerm == null) {
-         |  throw new RuntimeException("Rowtime timestamp is null. Please make 
sure that a " +
-         |    "proper TimestampAssigner is defined and the stream environment 
uses the EventTime " +
-         |    "time characteristic.");
+         |$timestamp = $contextTerm.timestamp();
+         |if ($timestamp == null) {
+         |  throw new RuntimeException("Rowtime timestamp is not defined. 
Please make sure that " +
+         |    "a proper TimestampAssigner is defined and the stream 
environment " +
+         |    "uses the EventTime time characteristic.");
          |}
+         |$resultTerm = $TIMESTAMP_DATA.fromEpochMillis($timestamp);
          |$nullTerm = false;
        """.stripMargin.trim
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/TimeAttributesITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/TimeAttributesITCase.scala
new file mode 100644
index 0000000..dd16fc3
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/TimeAttributesITCase.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table
+
+import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, 
WatermarkStrategy}
+import org.apache.flink.api.scala.createTypeInformation
+import org.apache.flink.core.testutils.FlinkMatchers.containsMessage
+import org.apache.flink.runtime.client.JobExecutionException
+import org.apache.flink.table.api._
+import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import 
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, 
TestingAppendSink}
+import org.apache.flink.types.Row
+import org.junit.Assert.{assertEquals, assertThat, fail}
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import java.time.{Duration, Instant, LocalDateTime, ZoneOffset}
+
+@RunWith(classOf[Parameterized])
+class TimeAttributesITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode) {
+
+  @Test
+  def testMissingTimeAttributeThrowsCorrectException(): Unit = {
+    val data = List(1L -> "hello", 2L -> "world")
+    val stream = env.fromCollection[(Long, String)](data)
+
+    tEnv.createTemporaryView("test", stream, $"event_time".rowtime(), $"data")
+    val result = tEnv.sqlQuery("SELECT * FROM test")
+
+    val sink = new TestingAppendSink()
+    tEnv.toAppendStream[Row](result).addSink(sink)
+    try {
+      env.execute()
+      fail("should fail")
+    } catch {
+      case t: Throwable =>
+        assertThat(
+          t,
+          containsMessage("Rowtime timestamp is not defined. Please make sure 
that a " +
+            "proper TimestampAssigner is defined and the stream environment 
uses the EventTime " +
+            "time characteristic."))
+    }
+  }
+
+  @Test
+  def testTimestampAttributesWithWatermarkStrategy(): Unit = {
+    val data = List(Instant.now().toEpochMilli -> "hello", 
Instant.now().toEpochMilli -> "world")
+    val stream = env.fromCollection[(Long, 
String)](data).assignTimestampsAndWatermarks(
+      WatermarkStrategy
+        .forBoundedOutOfOrderness[(Long, String)](Duration.ofMinutes(5))
+        .withTimestampAssigner {
+          new SerializableTimestampAssigner[(Long, String)] {
+            override def extractTimestamp(element: (Long, String), 
recordTimestamp: Long): Long =
+              element._1
+          }
+        }
+    )
+
+    tEnv.createTemporaryView("test", stream, $"event_time".rowtime(), $"data")
+    val result = tEnv.sqlQuery("SELECT * FROM test")
+
+    val sink = new TestingAppendSink()
+    tEnv.toAppendStream[Row](result).addSink(sink)
+    env.execute()
+
+    val formattedData = data.map {
+      case (timestamp, data) =>
+        val formattedTimestamp =
+          LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), 
ZoneOffset.UTC).toString
+        s"$formattedTimestamp,$data"
+    }
+    assertEquals(sink.getAppendResults.sorted, formattedData.sorted)
+  }
+}

Reply via email to