This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f0e036  [FLINK-11981][table-planner-blink] Introduce 
ProjectionCodeGenerator and HashCodeGenerator for BaseRow (#8019)
1f0e036 is described below

commit 1f0e036bbf6a37bb83623fb62d4900d7c28a5e1d
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 25 14:03:04 2019 +0800

    [FLINK-11981][table-planner-blink] Introduce ProjectionCodeGenerator and 
HashCodeGenerator for BaseRow (#8019)
---
 .../flink/table/codegen/HashCodeGenerator.scala    | 129 +++++++++++++
 .../table/codegen/ProjectionCodeGenerator.scala    | 214 +++++++++++++++++++++
 .../table/codegen/HashCodeGeneratorTest.scala      |  62 ++++++
 .../codegen/ProjectionCodeGeneratorTest.scala      | 110 +++++++++++
 .../flink/table/dataformat/util/HashUtil.java      |  79 ++++++++
 .../table/generated/GeneratedHashFunction.java     |  38 ++++
 .../flink/table/generated/GeneratedProjection.java |  38 ++++
 .../apache/flink/table/generated/HashFunction.java |  31 +++
 8 files changed, 701 insertions(+)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/HashCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/HashCodeGenerator.scala
new file mode 100644
index 0000000..7de5ad5
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/HashCodeGenerator.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import org.apache.flink.table.`type`.{ArrayType, DateType, DecimalType, 
InternalType, InternalTypes, TimestampType}
+import org.apache.flink.table.codegen.CodeGenUtils.{BASE_ROW, newName}
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.dataformat.util.HashUtil
+import org.apache.flink.table.generated.{GeneratedHashFunction, HashFunction}
+import org.apache.flink.util.MathUtils
+
+/**
+  * CodeGenerator for hash code [[BaseRow]], Calculate a hash value based on 
some fields
+  * of [[BaseRow]].
+  * NOTE: If you need a hash value that is more evenly distributed, call 
[[MathUtils.murmurHash]]
+  * outside to scatter.
+  */
+object HashCodeGenerator {
+
+  /**
+    * A sequence of prime numbers to be used for salting the computed hash 
values.
+    * Based on some empirical evidence, we are using a 32-element subsequence 
of the
+    * OEIS sequence #A068652 (numbers such that every cyclic permutation is a 
prime).
+    *
+    * @see <a href="http://en.wikipedia.org/wiki/List_of_prime_numbers";>
+    *        http://en.wikipedia.org/wiki/List_of_prime_numbers</a>
+    * @see <a href="http://oeis.org/A068652";>http://oeis.org/A068652</a>
+    */
+  val HASH_SALT: Array[Int] = Array[Int](
+    73, 79, 97, 113, 131, 197, 199, 311, 337, 373, 719, 733, 919, 971, 991, 
1193, 1931, 3119,
+    3779, 7793, 7937, 9311, 9377, 11939, 19391, 19937, 37199, 39119, 71993, 
91193, 93719, 93911)
+
+  def generateRowHash(
+      ctx: CodeGeneratorContext,
+      input: InternalType,
+      name: String,
+      hashFields: Array[Int]): GeneratedHashFunction = {
+    val className = newName(name)
+    val baseClass = classOf[HashFunction]
+    val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
+
+    val accessExprs = hashFields.map(
+      idx => GenerateUtils.generateFieldAccess(ctx, input, inputTerm, idx))
+
+    val (hashBody, resultTerm) = generateCodeBody(accessExprs)
+    val code =
+      j"""
+      public class $className implements ${baseClass.getCanonicalName} {
+
+        ${ctx.reuseMemberCode()}
+
+        public $className(Object[] references) throws Exception {
+          ${ctx.reuseInitCode()}
+        }
+
+        @Override
+        public int hashCode($BASE_ROW $inputTerm) {
+          ${ctx.reuseLocalVariableCode()}
+          $hashBody
+          return $resultTerm;
+        }
+      }
+    """.stripMargin
+
+    new GeneratedHashFunction(className, code, ctx.references.toArray)
+  }
+
+  private def generateCodeBody(accessExprs: Seq[GeneratedExpression]): 
(String, String) = {
+    val hashIntTerm = CodeGenUtils.newName("hashCode")
+    var i = -1
+    val hashBodyCode = accessExprs.map((expr) => {
+      i = i + 1
+      s"""
+         |$hashIntTerm *= ${HASH_SALT(i & 0x1F)};
+         |${expr.code}
+         |if (!${expr.nullTerm}) {
+         | $hashIntTerm += ${hashExpr(expr)};
+         |}
+         |""".stripMargin
+
+    }).mkString("\n")
+    (s"""
+        |int $hashIntTerm = 0;
+        |$hashBodyCode""".stripMargin, hashIntTerm)
+  }
+
+  private def hashExpr(expr: GeneratedExpression): String = {
+    val util = classOf[HashUtil].getCanonicalName
+    s"$util.hash${hashNameInHashUtil(expr.resultType)}(${expr.resultTerm})"
+  }
+
+  private def hashNameInHashUtil(t: InternalType): String = t match {
+    case InternalTypes.INT => "Int"
+    case InternalTypes.LONG => "Long"
+    case InternalTypes.SHORT => "Short"
+    case InternalTypes.BYTE => "Byte"
+    case InternalTypes.FLOAT => "Float"
+    case InternalTypes.DOUBLE => "Double"
+    case InternalTypes.BOOLEAN => "Boolean"
+    case InternalTypes.CHAR => "Char"
+    case InternalTypes.STRING => "String"
+    // decimal
+    case _: DecimalType => "Decimal"
+    // Sql time is Unboxing.
+    case _: DateType => "Int"
+    case InternalTypes.TIME => "Int"
+    case _: TimestampType => "Long"
+    case InternalTypes.BINARY => "Binary"
+    case _: ArrayType => throw new IllegalArgumentException(s"Not support type 
to hash: $t")
+    case _ => "Object"
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ProjectionCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ProjectionCodeGenerator.scala
new file mode 100644
index 0000000..095699a
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ProjectionCodeGenerator.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import org.apache.flink.table.`type`.{InternalType, RowType}
+import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.codegen.GenerateUtils.generateRecordStatement
+import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
+import org.apache.flink.table.dataformat._
+import org.apache.flink.table.generated.{GeneratedProjection, Projection}
+
+import scala.collection.mutable
+
+/**
+  * CodeGenerator for projection, Take out some fields of [[BaseRow]] to 
generate
+  * a new [[BaseRow]].
+  */
+object ProjectionCodeGenerator {
+
+  // for loop optimization will only be enabled
+  // if the number of fields to be project exceeds the limit
+  //
+  // this value is tuned by hand according to projecting
+  // some string type fields with randomized order
+  val FOR_LOOP_FIELD_LIMIT: Int = 25
+
+  def generateProjectionExpression(
+      ctx: CodeGeneratorContext,
+      inType: RowType,
+      outType: RowType,
+      inputMapping: Array[Int],
+      outClass: Class[_ <: BaseRow] = classOf[BinaryRow],
+      inputTerm: String = DEFAULT_INPUT1_TERM,
+      outRecordTerm: String = DEFAULT_OUT_RECORD_TERM,
+      outRecordWriterTerm: String = DEFAULT_OUT_RECORD_WRITER_TERM,
+      reusedOutRecord: Boolean = true,
+      nullCheck: Boolean = true): GeneratedExpression = {
+
+    // we use a for loop to do all the projections for the same field type
+    // instead of generating separated code for each field.
+    // when the number of fields of the same type is large, this can improve 
performance.
+    def generateLoop(
+        fieldType: InternalType,
+        inIdxs: mutable.ArrayBuffer[Int],
+        outIdxs: mutable.ArrayBuffer[Int]): String = {
+      // this array contains the indices of the fields
+      // whose type equals to `fieldType` in the input row
+      val inIdxArr = newName("inIdx")
+      ctx.addReusableMember(s"int[] $inIdxArr = null;")
+      ctx.addReusableInitStatement(s"$inIdxArr = new int[] 
{${inIdxs.mkString(", ")}};")
+
+      // this array contains the indices of the fields
+      // whose type equals to `fieldType` in the output row
+      val outIdxArr = newName("outIdx")
+      ctx.addReusableMember(s"int[] $outIdxArr = null;")
+      ctx.addReusableInitStatement(s"$outIdxArr = new int[] 
{${outIdxs.mkString(", ")}};")
+
+      val loopIdx = newName("i")
+
+      val fieldVal = CodeGenUtils.baseRowFieldReadAccess(
+        ctx, s"$inIdxArr[$loopIdx]", inputTerm, fieldType)
+
+      val inIdx = s"$inIdxArr[$loopIdx]"
+      val outIdx = s"$outIdxArr[$loopIdx]"
+      val nullTerm = s"$inputTerm.isNullAt($inIdx)"
+      s"""
+         |for (int $loopIdx = 0; $loopIdx < $inIdxArr.length; $loopIdx++) {
+         |  ${CodeGenUtils.baseRowSetField(ctx, outClass, outRecordTerm, 
outIdx,
+                GeneratedExpression(fieldVal, nullTerm, "", fieldType),
+                Some(outRecordWriterTerm))}
+         |}
+       """.stripMargin
+    }
+
+    val outFieldTypes = outType.getFieldTypes
+    val typeIdxs = new mutable.HashMap[
+      InternalType,
+      (mutable.ArrayBuffer[Int], mutable.ArrayBuffer[Int])]()
+
+    for (i <- outFieldTypes.indices) {
+      val (inIdxs, outIdxs) = typeIdxs.getOrElseUpdate(
+        outFieldTypes(i), (mutable.ArrayBuffer.empty[Int], 
mutable.ArrayBuffer.empty[Int]))
+      inIdxs.append(inputMapping(i))
+      outIdxs.append(i)
+    }
+
+    val codeBuffer = mutable.ArrayBuffer.empty[String]
+    for ((fieldType, (inIdxs, outIdxs)) <- typeIdxs) {
+      if (inIdxs.length >= FOR_LOOP_FIELD_LIMIT) {
+        // for loop optimization will only be enabled
+        // if the number of fields to be project exceeds the limit
+        codeBuffer.append(generateLoop(fieldType, inIdxs, outIdxs))
+      } else {
+        // otherwise we do not use for loop
+        for (i <- inIdxs.indices) {
+          val nullTerm = s"$inputTerm.isNullAt(${inIdxs(i)})"
+          codeBuffer.append(
+            CodeGenUtils.baseRowSetField(ctx, outClass, outRecordTerm, 
outIdxs(i).toString,
+              GeneratedExpression(baseRowFieldReadAccess(
+                ctx, inIdxs(i), inputTerm, fieldType), nullTerm, "", 
fieldType),
+              Some(outRecordWriterTerm)))
+        }
+      }
+    }
+
+    val setFieldsCode = codeBuffer.mkString("\n")
+
+    val outRowInitCode ={
+      val initCode = generateRecordStatement(
+        outType, outClass, outRecordTerm, Some(outRecordWriterTerm))
+      if (reusedOutRecord) {
+        ctx.addReusableMember(initCode)
+        NO_CODE
+      } else {
+        initCode
+      }
+    }
+
+    val code = if (outClass == classOf[BinaryRow]) {
+      val writer = outRecordWriterTerm
+      val resetWriter = if (ctx.nullCheck) s"$writer.reset();" else 
s"$writer.resetCursor();"
+      val completeWriter: String = s"$writer.complete();"
+      s"""
+         |$outRowInitCode
+         |$resetWriter
+         |$setFieldsCode
+         |$completeWriter
+        """.stripMargin
+    } else {
+      s"""
+         |$outRowInitCode
+         |$setFieldsCode
+        """.stripMargin
+    }
+    GeneratedExpression(outRecordTerm, NEVER_NULL, code, outType)
+  }
+
+  /**
+    * CodeGenerator for projection.
+    * @param reusedOutRecord If objects or variables can be reused, they will 
be added a reusable
+    * output record to the member area of the generated class. If not they 
will be as temp
+    * variables.
+    * @return
+    */
+  def generateProjection(
+      ctx: CodeGeneratorContext,
+      name: String,
+      inType: RowType,
+      outType: RowType,
+      inputMapping: Array[Int],
+      outClass: Class[_ <: BaseRow] = classOf[BinaryRow],
+      inputTerm: String = DEFAULT_INPUT1_TERM,
+      outRecordTerm: String = DEFAULT_OUT_RECORD_TERM,
+      outRecordWriterTerm: String = DEFAULT_OUT_RECORD_WRITER_TERM,
+      reusedOutRecord: Boolean = true,
+      nullCheck: Boolean = true): GeneratedProjection = {
+    val className = newName(name)
+    val baseClass = classOf[Projection[_, _]]
+
+    val expression = generateProjectionExpression(
+      ctx, inType, outType, inputMapping, outClass,
+      inputTerm, outRecordTerm, outRecordWriterTerm, reusedOutRecord, 
nullCheck)
+
+    val code =
+      s"""
+         |public class $className implements ${
+            baseClass.getCanonicalName}<$BASE_ROW, 
${outClass.getCanonicalName}> {
+         |
+         |  ${ctx.reuseMemberCode()}
+         |
+         |  public $className(Object[] references) throws Exception {
+         |    ${ctx.reuseInitCode()}
+         |  }
+         |
+         |  @Override
+         |  public ${outClass.getCanonicalName} apply($BASE_ROW $inputTerm) {
+         |    ${ctx.reuseLocalVariableCode()}
+         |    ${expression.code}
+         |    return ${expression.resultTerm};
+         |  }
+         |}
+        """.stripMargin
+
+    new GeneratedProjection(className, code, ctx.references.toArray)
+  }
+
+  /**
+    * For java invoke.
+    */
+  def generateProjection(
+      ctx: CodeGeneratorContext,
+      name: String,
+      inputType: RowType,
+      outputType: RowType,
+      inputMapping: Array[Int]): GeneratedProjection =
+    generateProjection(
+      ctx, name, inputType, outputType, inputMapping, inputTerm = 
DEFAULT_INPUT1_TERM)
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/HashCodeGeneratorTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/HashCodeGeneratorTest.scala
new file mode 100644
index 0000000..9ca565c
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/HashCodeGeneratorTest.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import org.apache.flink.table.`type`.{InternalTypes, RowType}
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.dataformat.GenericRow
+
+import org.junit.{Assert, Test}
+
+/**
+  * Test for [[HashCodeGenerator]].
+  */
+class HashCodeGeneratorTest {
+
+  private val classLoader = Thread.currentThread().getContextClassLoader
+
+  @Test
+  def testHash(): Unit = {
+    val hashFunc1 = HashCodeGenerator.generateRowHash(
+      new CodeGeneratorContext(new TableConfig),
+      new RowType(InternalTypes.INT, InternalTypes.LONG, InternalTypes.BINARY),
+      "name",
+      Array(1, 0)
+    ).newInstance(classLoader)
+
+    val hashFunc2 = HashCodeGenerator.generateRowHash(
+      new CodeGeneratorContext(new TableConfig),
+      new RowType(InternalTypes.INT, InternalTypes.LONG, InternalTypes.BINARY),
+      "name",
+      Array(1, 2, 0)
+    ).newInstance(classLoader)
+
+    val row = GenericRow.of(ji(5), jl(8), Array[Byte](1, 5, 6))
+    Assert.assertEquals(637, hashFunc1.hashCode(row))
+    Assert.assertEquals(136516167, hashFunc2.hashCode(row))
+  }
+
+  def ji(i: Int): Integer = {
+    new Integer(i)
+  }
+
+  def jl(l: Long): java.lang.Long = {
+    new java.lang.Long(l)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/ProjectionCodeGeneratorTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/ProjectionCodeGeneratorTest.scala
new file mode 100644
index 0000000..467c0b8
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/ProjectionCodeGeneratorTest.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import org.apache.flink.table.`type`.{InternalTypes, RowType}
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.dataformat.{BaseRow, BinaryRow, GenericRow}
+import org.apache.flink.table.generated.Projection
+
+import org.junit.{Assert, Test}
+
+import scala.util.Random
+
+/**
+  * Test for [[ProjectionCodeGenerator]].
+  */
+class ProjectionCodeGeneratorTest {
+
+  private val classLoader = Thread.currentThread().getContextClassLoader
+
+  @Test
+  def testProjectionBinaryRow(): Unit = {
+    val projection = ProjectionCodeGenerator.generateProjection(
+      new CodeGeneratorContext(new TableConfig),
+      "name",
+      new RowType(InternalTypes.INT, InternalTypes.LONG),
+      new RowType(InternalTypes.LONG, InternalTypes.INT),
+      Array(1, 0)
+    ).newInstance(classLoader).asInstanceOf[Projection[BaseRow, BinaryRow]]
+    val row: BinaryRow = projection.apply(GenericRow.of(ji(5), jl(8)))
+    Assert.assertEquals(5, row.getInt(1))
+    Assert.assertEquals(8, row.getLong(0))
+  }
+
+  @Test
+  def testProjectionGenericRow(): Unit = {
+    val projection = ProjectionCodeGenerator.generateProjection(
+      new CodeGeneratorContext(new TableConfig),
+      "name",
+      new RowType(InternalTypes.INT, InternalTypes.LONG),
+      new RowType(InternalTypes.LONG, InternalTypes.INT),
+      Array(1, 0),
+      outClass = classOf[GenericRow]
+    ).newInstance(classLoader).asInstanceOf[Projection[BaseRow, GenericRow]]
+    val row: GenericRow = projection.apply(GenericRow.of(ji(5), jl(8)))
+    Assert.assertEquals(5, row.getInt(1))
+    Assert.assertEquals(8, row.getLong(0))
+  }
+
+  @Test
+  def testProjectionManyField(): Unit = {
+    val rowType = new RowType((0 until 100).map(_ => 
InternalTypes.INT).toArray: _*)
+    val projection = ProjectionCodeGenerator.generateProjection(
+      new CodeGeneratorContext(new TableConfig),
+      "name",
+      rowType,
+      rowType,
+      (0 until 100).toArray
+    ).newInstance(classLoader).asInstanceOf[Projection[BaseRow, BinaryRow]]
+    val rnd = new Random()
+    val input = GenericRow.of((0 until 100).map(_ => 
ji(rnd.nextInt())).toArray: _*)
+    val row = projection.apply(input)
+    for (i <- 0 until 100) {
+      Assert.assertEquals(input.getInt(i), row.getInt(i))
+    }
+  }
+
+  @Test
+  def testProjectionManyFieldGenericRow(): Unit = {
+    val rowType = new RowType((0 until 100).map(_ => 
InternalTypes.INT).toArray: _*)
+    val projection = ProjectionCodeGenerator.generateProjection(
+      new CodeGeneratorContext(new TableConfig),
+      "name",
+      rowType,
+      rowType,
+      (0 until 100).toArray,
+      outClass = classOf[GenericRow]
+    ).newInstance(classLoader).asInstanceOf[Projection[BaseRow, GenericRow]]
+    val rnd = new Random()
+    val input = GenericRow.of((0 until 100).map(_ => 
ji(rnd.nextInt())).toArray: _*)
+    val row = projection.apply(input)
+    for (i <- 0 until 100) {
+      Assert.assertEquals(input.getInt(i), row.getInt(i))
+    }
+  }
+
+  def ji(i: Int): Integer = {
+    new Integer(i)
+  }
+
+  def jl(l: Long): java.lang.Long = {
+    new java.lang.Long(l)
+  }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/util/HashUtil.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/util/HashUtil.java
new file mode 100644
index 0000000..4950926
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/util/HashUtil.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.util;
+
+import org.apache.flink.table.dataformat.BinaryString;
+import org.apache.flink.table.dataformat.Decimal;
+import org.apache.flink.table.util.MurmurHashUtil;
+
+import static 
org.apache.flink.table.dataformat.util.BinaryRowUtil.BYTE_ARRAY_BASE_OFFSET;
+
+/**
+ * Util for hash code of data format.
+ */
+public class HashUtil {
+
+       public static int hashInt(int value) {
+               return Integer.hashCode(value);
+       }
+
+       public static int hashLong(long value) {
+               return Long.hashCode(value);
+       }
+
+       public static int hashShort(short value) {
+               return Short.hashCode(value);
+       }
+
+       public static int hashByte(byte value) {
+               return Byte.hashCode(value);
+       }
+
+       public static int hashFloat(float value) {
+               return Float.hashCode(value);
+       }
+
+       public static int hashDouble(double value) {
+               return Double.hashCode(value);
+       }
+
+       public static int hashBoolean(boolean value) {
+               return Boolean.hashCode(value);
+       }
+
+       public static int hashChar(char value) {
+               return Character.hashCode(value);
+       }
+
+       public static int hashObject(Object value) {
+               return value.hashCode();
+       }
+
+       public static int hashString(BinaryString value) {
+               return value.hashCode();
+       }
+
+       public static int hashDecimal(Decimal value) {
+               return value.hashCode();
+       }
+
+       public static int hashBinary(byte[] value) {
+               return MurmurHashUtil.hashUnsafeBytes(value, 
BYTE_ARRAY_BASE_OFFSET, value.length);
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedHashFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedHashFunction.java
new file mode 100644
index 0000000..2049074
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedHashFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.generated;
+
+/**
+ * Describes a generated {@link HashFunction}.
+ */
+public final class GeneratedHashFunction extends GeneratedClass<HashFunction> {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Creates a GeneratedHashFunction.
+        *
+        * @param className class name of the generated Function.
+        * @param code code of the generated Function.
+        * @param references referenced objects of the generated Function.
+        */
+       public GeneratedHashFunction(String className, String code, Object[] 
references) {
+               super(className, code, references);
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedProjection.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedProjection.java
new file mode 100644
index 0000000..a0c2246
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedProjection.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.generated;
+
+/**
+ * Describes a generated {@link Projection}.
+ */
+public final class GeneratedProjection extends GeneratedClass<Projection> {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Creates a GeneratedProjection.
+        *
+        * @param className class name of the generated Function.
+        * @param code code of the generated Function.
+        * @param references referenced objects of the generated Function.
+        */
+       public GeneratedProjection(String className, String code, Object[] 
references) {
+               super(className, code, references);
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/HashFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/HashFunction.java
new file mode 100644
index 0000000..19d1e2f
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/HashFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.generated;
+
+import org.apache.flink.table.dataformat.BaseRow;
+
+/**
+ * Interface for code generated hash code of {@link BaseRow}, which will 
select some
+ * fields to hash.
+ */
+public interface HashFunction {
+
+       int hashCode(BaseRow row);
+
+}

Reply via email to