wuchong commented on a change in pull request #15195:
URL: https://github.com/apache/flink/pull/15195#discussion_r595727043



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
##########
@@ -182,26 +183,65 @@ object WindowJoinUtil {
     }
 
     // Validate join
+    def getLeftFieldNames() = join.getLeft.getRowType.getFieldNames.toList
+
+    def getRightFieldNames() = join.getRight.getRowType.getFieldNames.toList
+
     if (windowStartEqualityLeftKeys.nonEmpty && 
windowEndEqualityLeftKeys.nonEmpty) {
       if (
         leftWindowProperties.getTimeAttributeType != 
rightWindowProperties.getTimeAttributeType) {
+
+        def timeAttributeTypeStr(isRowTime: Boolean): String = {
+          if (isRowTime) "ROWTIME" else "PROCTIME"

Review comment:
       We can just print the logical type, because we will support 
TIMESTAMP_LTZ as time attribute soon, so the logical type maybe different. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
##########
@@ -182,26 +183,65 @@ object WindowJoinUtil {
     }
 
     // Validate join
+    def getLeftFieldNames() = join.getLeft.getRowType.getFieldNames.toList
+
+    def getRightFieldNames() = join.getRight.getRowType.getFieldNames.toList
+
     if (windowStartEqualityLeftKeys.nonEmpty && 
windowEndEqualityLeftKeys.nonEmpty) {
       if (
         leftWindowProperties.getTimeAttributeType != 
rightWindowProperties.getTimeAttributeType) {
+
+        def timeAttributeTypeStr(isRowTime: Boolean): String = {
+          if (isRowTime) "ROWTIME" else "PROCTIME"
+        }
+
         throw new TableException(
           "Currently, window join doesn't support different time attribute 
type of left and " +
             "right inputs.\n" +
-            s"The left time attribute type is 
${leftWindowProperties.getTimeAttributeType}.\n" +
-            s"The right time attribute type is 
${rightWindowProperties.getTimeAttributeType}.")
+            s"The left time attribute type is " +
+            s"${timeAttributeTypeStr(leftWindowProperties.isRowtime)}.\n" +
+            s"The right time attribute type is " +
+            s"${timeAttributeTypeStr(rightWindowProperties.isRowtime)}.")
       } else if (leftWindowProperties.getWindowSpec != 
rightWindowProperties.getWindowSpec) {
+
+        def windowSpecToStr(
+            inputFieldNames: Seq[String],
+            windowStartIdx: Int,
+            windowEndIdx: Int,
+            windowSpec: WindowSpec): String = {
+          val windowing = s"win_start=[${inputFieldNames(windowStartIdx)}]" +
+            s", win_end=[${inputFieldNames(windowEndIdx)}]"
+          windowSpec.toSummaryString(windowing)

Review comment:
       WindowSpec doesn't contain window_start and window_end column 
information, so I think we don't need to print them. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
##########
@@ -182,26 +183,65 @@ object WindowJoinUtil {
     }
 
     // Validate join
+    def getLeftFieldNames() = join.getLeft.getRowType.getFieldNames.toList
+
+    def getRightFieldNames() = join.getRight.getRowType.getFieldNames.toList

Review comment:
       A local variable is enough?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to