godfreyhe commented on a change in pull request #7931: [FLINK-11854]
[table-planner-blink] Introduce batch physical nodes
URL: https://github.com/apache/flink/pull/7931#discussion_r264168773
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchPhysicalRel.scala
##########
@@ -18,11 +18,59 @@
package org.apache.flink.table.plan.nodes.physical.batch
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.dataformat.BinaryRow
import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel
+import org.apache.flink.table.runtime.sort.BinaryIndexedSortable
+import org.apache.flink.table.typeutils.BinaryRowSerializer
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+
+import scala.collection.JavaConversions._
/**
* Base class for batch physical relational expression.
*/
trait BatchPhysicalRel extends FlinkPhysicalRel {
}
+
+object BatchPhysicalRel {
+
+ private[flink] def binaryRowAverageSize(rel: RelNode): Double = {
+ val binaryType = FlinkTypeFactory.toInternalRowType(rel.getRowType)
+ // TODO reuse FlinkRelMetadataQuery here
+ val mq = rel.getCluster.getMetadataQuery
+ val columnSizes = mq.getAverageColumnSizes(rel)
+ var length = 0d
+ columnSizes.zip(binaryType.getFieldTypes).foreach {
+ case (columnSize, internalType) =>
+ if (BinaryRow.isInFixedLengthPart(internalType)) {
+ length += 8
+ } else {
+ if (columnSize == null) {
+ // find a better way of computing generic type field
variable-length
+ // right now we use a small value assumption
+ length += 16
+ } else {
+ // the 8 bytes is used store the length and offset of
variable-length part.
+ length += columnSize + 8
+ }
+ }
+ }
+ length += BinaryRow.calculateBitSetWidthInBytes(columnSizes.size())
+ length
+ }
+
+ def computeSortMemory(mq: RelMetadataQuery, inputOfSort: RelNode): Double = {
Review comment:
We can move `binaryRowAverageSize` and `computeSortMemory` methods into
`FlinkRelMdUtil` class
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services